You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 23:37:29 UTC

[1/5] flink git commit: [FLINK-2587] [streaming] Make sure that close() is not called while checkpoint methods are in progress.

Repository: flink
Updated Branches:
  refs/heads/master f5016439b -> 0807eec0c


[FLINK-2587] [streaming] Make sure that close() is not called while checkpoint methods are in progress.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74ed7145
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74ed7145
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74ed7145

Branch: refs/heads/master
Commit: 74ed714534c832de4376f7acbd4896c7a93273de
Parents: f501643
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 27 19:33:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 19:33:00 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer.java    | 82 +++++++++++---------
 .../streaming/runtime/tasks/StreamTask.java     | 14 +++-
 2 files changed, 56 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74ed7145/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index d94f29b..8066b3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -497,48 +497,56 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			LOG.debug("Committing offsets externally for checkpoint {}", checkpointId);
 		}
 
-		long[] checkpointOffsets;
-
-		// the map may be asynchronously updates when snapshotting state, so we synchronize
-		synchronized (pendingCheckpoints) {
-			final int posInMap = pendingCheckpoints.indexOf(checkpointId);
-			if (posInMap == -1) {
-				LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
-				return;
+		try {
+			long[] checkpointOffsets;
+	
+			// the map may be asynchronously updates when snapshotting state, so we synchronize
+			synchronized (pendingCheckpoints) {
+				final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+				if (posInMap == -1) {
+					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+					return;
+				}
+	
+				checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+				
+				// remove older checkpoints in map
+				for (int i = 0; i < posInMap; i++) {
+					pendingCheckpoints.remove(0);
+				}
 			}
-
-			checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
-			
-			// remove older checkpoints in map
-			for (int i = 0; i < posInMap; i++) {
-				pendingCheckpoints.remove(0);
+	
+			if (LOG.isInfoEnabled()) {
+				LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
 			}
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Committing offsets {} to offset store: {}", Arrays.toString(checkpointOffsets), offsetStore);
-		}
-
-		// build the map of (topic,partition) -> committed offset
-		Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
-		for (TopicPartition tp : subscribedPartitions) {
-			
-			int partition = tp.partition();
-			long offset = checkpointOffsets[partition];
-			long lastCommitted = commitedOffsets[partition];
-			
-			if (offset != OFFSET_NOT_SET) {
-				if (offset > lastCommitted) {
-					offsetsToCommit.put(tp, offset);
-					LOG.debug("Committing offset {} for partition {}", offset, partition);
-				}
-				else {
-					LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+	
+			// build the map of (topic,partition) -> committed offset
+			Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
+			for (TopicPartition tp : subscribedPartitions) {
+				
+				int partition = tp.partition();
+				long offset = checkpointOffsets[partition];
+				long lastCommitted = commitedOffsets[partition];
+				
+				if (offset != OFFSET_NOT_SET) {
+					if (offset > lastCommitted) {
+						offsetsToCommit.put(tp, offset);
+						LOG.debug("Committing offset {} for partition {}", offset, partition);
+					}
+					else {
+						LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+					}
 				}
 			}
+			
+			offsetHandler.commit(offsetsToCommit);
+		}
+		catch (Exception e) {
+			if (running) {
+				throw e;
+			}
+			// else ignore exception if we are no longer running
 		}
-		
-		offsetHandler.commit(offsetsToCommit);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/74ed7145/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8a5f741..ffd04e6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -175,6 +175,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 				LOG.debug("Finished task {}", getName());
 			}
 
+			// make sure no further checkpoint and notification actions happen
+			// for that we set this task as not running and make sure no other thread is
+			// currently in the locked scope before we close the operators
+			this.isRunning = false;
+			synchronized (checkpointLock) {}
+			
 			// this is part of the main logic, so if this fails, the task is considered failed
 			closeAllOperators();
 			operatorOpen = false;
@@ -333,9 +339,11 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		synchronized (checkpointLock) {
-			for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
-				if (chainedOperator instanceof StatefulStreamOperator) {
-					((StatefulStreamOperator<?>) chainedOperator).notifyCheckpointComplete(checkpointId);
+			if (isRunning) {
+				for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
+					if (chainedOperator instanceof StatefulStreamOperator) {
+						((StatefulStreamOperator<?>) chainedOperator).notifyCheckpointComplete(checkpointId);
+					}
 				}
 			}
 		}


[3/5] flink git commit: [FLINK-2582] [docs] Add documentation how to build Flink against different Scala versions

Posted by se...@apache.org.
[FLINK-2582] [docs] Add documentation how to build Flink against different Scala versions

This closes #1070


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc63ef2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc63ef2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc63ef2c

Branch: refs/heads/master
Commit: bc63ef2c122a842bf2a98eb5813d16b5d43fa19d
Parents: 8cf8736
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 27 15:16:10 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 19:47:37 2015 +0200

----------------------------------------------------------------------
 docs/_includes/navbar.html |  2 +-
 docs/setup/building.md     | 33 ++++++++++++++++++++++++++++++---
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc63ef2c/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index 26ad740..84cc6bc 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -44,7 +44,7 @@ under the License.
             <li class="dropdown{% if page.url contains setup %} active{% endif %}">
               <a href="{{ setup }}" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
               <ul class="dropdown-menu" role="menu">
-                <li><a href="{{ setup }}/building.html">Get Flink {{ site.version }}</a></li>
+                <li><a href="{{ setup }}/building.html">Build Flink {{ site.version }}</a></li>
 
                 <li class="divider"></li>
                 <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/bc63ef2c/docs/setup/building.md
----------------------------------------------------------------------
diff --git a/docs/setup/building.md b/docs/setup/building.md
index 2fcf412..2581812 100644
--- a/docs/setup/building.md
+++ b/docs/setup/building.md
@@ -20,7 +20,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-In order to build Flink, you need the source code. Either download the source of a release or clone the git repository. In addition to that, you need Maven 3 and a JDK (Java Development Kit). Note that you can not build Flink with Oracle JDK 6 due to a unresolved bug in the Oracle Java compiler. It works well with OpenJDK 6 and all Java 7 and 8 compilers.
+In order to build Flink, you need the source code. Either download the source of a release or clone the git repository. In addition to that, you need Maven 3 and a JDK (Java Development Kit).
+Flink requires at least Java 7 to build. We recommend using Java 8.
 
 To clone from git, enter:
 
@@ -48,8 +49,8 @@ This section covers building Flink for a specific Hadoop version. Most users do
 The problem is that Flink uses HDFS and YARN which are both dependencies from Apache Hadoop. There exist many different versions of Hadoop (from both the upstream project and the different Hadoop distributions). If a user is using a wrong combination of versions, exceptions like this one occur:
 
 ~~~bash
-ERROR: The job was not successfully submitted to the nephele job manager:
-    org.apache.flink.nephele.executiongraph.GraphConversionException: Cannot compute input splits for TSV:
+ERROR: Job execution failed.
+    org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'TextInputFormat(/my/path)':
     java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException:
     Protocol message contained an invalid tag (zero).; Host Details :
 ~~~
@@ -95,6 +96,32 @@ So if you are building Flink for Hadoop `2.0.0-alpha`, use the following command
 -P!include-yarn -Dhadoop.version=2.0.0-alpha
 ~~~
 
+
+## Build Flink for a specific Scala Version
+
+**Note:** Users that purely use the Java APIs and libraries can ignore this section.
+
+Flink has APIs, libraries, and runtime modules written in [Scala](http://scala-lang.org). Users of the Scala API and libraries may have to match the Scala version of Flink with the Scala version
+of their projects (because Scala is not strictly backwards compatible).
+
+By default, Flink is built with Scala *2.10*. To build Flink with Scala *2.11*, append the `-Dscala-2.11` option to your build command:
+
+~~~bash
+mvn clean install -DskipTests -Dscala-2.11
+~~~
+
+
+To build against custom Scala versions, you need to supply the *language version* and the *binary version* as properties to the build:
+
+~~~bash
+mvn clean install -DskipTests -Dscala.version=2.11.4 -Dscala.binary.version=2.11
+~~~
+
+Flink is developed against Scala *2.10*, and tested additionally against Scala *2.11*. These two versions are known to be compatible. Earlier versions (like Scala *2.9*) are *not* compatible.
+
+Newer versions may be compatible, depending on breaking changes in the language features used by Flink, and the availability of Flink's dependencies in those Scala versions. The dependencies written in Scala include for example *Kafka*, *Akka*, *Scalatest*, and *scopt*.
+
+
 ## Background
 
 The builds with Maven are controlled by [properties](http://maven.apache.org/pom.html#Properties) and <a href="http://maven.apache.org/guides/introduction/introduction-to-profiles.html">build profiles</a>.


[2/5] flink git commit: [FLINK-2326] [yarn] Write yarn properties file to temp directory

Posted by se...@apache.org.
[FLINK-2326] [yarn] Write yarn properties file to temp directory

This closes #1062


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cf8736a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cf8736a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cf8736a

Branch: refs/heads/master
Commit: 8cf8736ad99c78aa419ee2d6be4b70ac1b29d0ec
Parents: 74ed714
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Aug 26 16:11:30 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 19:41:39 2015 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  6 +++
 .../org/apache/flink/client/CliFrontend.java    |  8 +++-
 .../flink/client/FlinkYarnSessionCli.java       | 41 ++++++++++-------
 .../CliFrontendAddressConfigurationTest.java    | 31 ++++++++++++-
 .../testconfigwithyarn/flink-conf.yaml          |  1 -
 .../flink/configuration/ConfigConstants.java    |  9 ++++
 .../runtime/yarn/AbstractFlinkYarnCluster.java  | 47 +++++++++++++++++++-
 .../runtime/yarn/FlinkYarnClusterStatus.java    | 10 +++--
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 12 +++--
 9 files changed, 136 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 53b9ae0..e2ffda6 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -351,6 +351,12 @@ to set the JM host:port manually. It is recommended to leave this option at 1.
 
 - `yarn.heartbeat-delay` (Default: 5 seconds). Time between heartbeats with the ResourceManager.
 
+- `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, 
+the JobManager's host and the number of available processing slots is written into a properties file, 
+so that the Flink client is able to pick those details up. This configuration parameter allows 
+changing the default location of that file (for example for environments sharing a Flink 
+installation between users)
+
 ## Background
 
 ### Configuring the Network Buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 9ef2d5f..ea1a6e9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -103,7 +103,7 @@ public class CliFrontend {
 	private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 
 	// YARN-session related constants
-	public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
+	public static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
 	public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
 	public static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
 	public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
@@ -162,7 +162,11 @@ public class CliFrontend {
 		this.config = GlobalConfiguration.getConfiguration();
 
 		// load the YARN properties
-		File propertiesFile = new File(configDirectory, YARN_PROPERTIES_FILE);
+		String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
+		String currentUser = System.getProperty("user.name");
+		String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+
+		File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser);
 		if (propertiesFile.exists()) {
 
 			logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 83993f2..66a4834 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
@@ -145,13 +146,13 @@ public class FlinkYarnSessionCli {
 
 		flinkYarnClient.setConfigurationFilePath(confPath);
 
-		List<File> shipFiles = new ArrayList<File>();
+		List<File> shipFiles = new ArrayList<>();
 		// path to directory to ship
 		if (cmd.hasOption(SHIP_PATH.getOpt())) {
 			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
 			File shipDir = new File(shipPath);
 			if (shipDir.isDirectory()) {
-				shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
+				shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
 					@Override
 					public boolean accept(File dir, String name) {
 						return !(name.equals(".") || name.equals(".."));
@@ -215,7 +216,7 @@ public class FlinkYarnSessionCli {
 		flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
 		if (cmd.hasOption(DETACHED.getOpt())) {
-			detachedMode = true;
+			this.detachedMode = true;
 			flinkYarnClient.setDetachedMode(detachedMode);
 		}
 
@@ -254,7 +255,7 @@ public class FlinkYarnSessionCli {
 	}
 
 	public static AbstractFlinkYarnClient getFlinkYarnClient() {
-		AbstractFlinkYarnClient yarnClient = null;
+		AbstractFlinkYarnClient yarnClient;
 		try {
 			Class<? extends AbstractFlinkYarnClient> yarnClientClass =
 					Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
@@ -288,20 +289,21 @@ public class FlinkYarnSessionCli {
 		int numTaskmanagers = 0;
 		try {
 			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+			label:
 			while (true) {
 				// ------------------ check if there are updates by the cluster -----------
 
 				FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
 				if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
 					System.err.println("Number of connected TaskManagers changed to " +
-							status.getNumberOfTaskManagers() + ". Slots available: " +  status.getNumberOfSlots());
+							status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots());
 					numTaskmanagers = status.getNumberOfTaskManagers();
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();
 				if (messages != null && messages.size() > 0) {
 					System.err.println("New messages from the YARN cluster: ");
-					for(String msg : messages) {
+					for (String msg : messages) {
 						System.err.println(msg);
 					}
 				}
@@ -321,12 +323,17 @@ public class FlinkYarnSessionCli {
 
 				if (in.ready()) {
 					String command = in.readLine();
-					if (command.equals("quit") || command.equals("stop")) {
-						break; // leave loop, cli will stop cluster.
-					} else if (command.equals("help"))  {
-						System.err.println(HELP);
-					} else {
-						System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP);
+					switch (command) {
+						case "quit":
+						case "stop":
+							break label;
+
+						case "help":
+							System.err.println(HELP);
+							break;
+						default:
+							System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP);
+							break;
 					}
 				}
 				if (yarnCluster.hasBeenStopped()) {
@@ -380,7 +387,7 @@ public class FlinkYarnSessionCli {
 		// Query cluster for metrics
 		if (cmd.hasOption(QUERY.getOpt())) {
 			AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-			String description = null;
+			String description;
 			try {
 				description = flinkYarnClient.getClusterDescription();
 			} catch (Exception e) {
@@ -415,8 +422,12 @@ public class FlinkYarnSessionCli {
 			System.out.println("Flink JobManager is now running on " + jobManagerAddress);
 			System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
 			// file that we write into the conf/ dir containing the jobManager address and the dop.
-			String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
-			File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE);
+
+			String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
+			String currentUser = System.getProperty("user.name");
+			String propertiesFileLocation = yarnCluster.getFlinkConfiguration().getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
+
+			File yarnPropertiesFile = new File(propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser);
 
 			Properties yarnProps = new Properties();
 			yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 7b0dd2b..2d41374 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -24,19 +24,27 @@ import static org.junit.Assert.fail;
 
 import static org.mockito.Mockito.*;
 
+import java.io.File;
 import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
 
 import org.apache.flink.client.cli.CommandLineOptions;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Tests that verify that the CLI client picks up the correct address for the JobManager
  * from configuration and configs.
  */
 public class CliFrontendAddressConfigurationTest {
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
 	
 	@BeforeClass
 	public static void init() {
@@ -101,11 +109,30 @@ public class CliFrontendAddressConfigurationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Test that the CliFrontent is able to pick up the .yarn-properties file from a specified location.
+	 */
 	@Test
 	public void testYarnConfig() {
 		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
+			File tmpFolder = folder.newFolder();
+			String currentUser = System.getProperty("user.name");
+
+			// copy reference flink-conf.yaml to temporary test directory and append custom configuration path.
+			File confFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile());
+			File testConfFile = new File(tmpFolder, "flink-conf.yaml");
+			org.apache.commons.io.FileUtils.copyFile(confFile, testConfFile);
+			String toAppend = "\nyarn.properties-file.location: " + tmpFolder;
+			// append to flink-conf.yaml
+			Files.write(testConfFile.toPath(), toAppend.getBytes(), StandardOpenOption.APPEND);
+			// copy .yarn-properties-<username>
+			File propertiesFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/.yarn-properties").getFile());
+			File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser);
+			org.apache.commons.io.FileUtils.copyFile(propertiesFile, testPropertiesFile);
+
+			// start CLI Frontend
+			CliFrontend frontend = new CliFrontend(tmpFolder.getAbsolutePath());
 
 			CommandLineOptions options = mock(CommandLineOptions.class);
 			InetSocketAddress address = frontend.getJobManagerAddress(options);

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml b/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml
index 084c71e..9e5de34 100644
--- a/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml
+++ b/flink-clients/src/test/resources/testconfigwithyarn/flink-conf.yaml
@@ -23,4 +23,3 @@
 jobmanager.rpc.address: 192.168.1.33
 
 jobmanager.rpc.port: 55443
-

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2ca7c36..355da2d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -249,6 +249,15 @@ public final class ConfigConstants {
 	 */
 	public static final String YARN_HEARTBEAT_DELAY_SECONDS = "yarn.heartbeat-delay";
 
+	/**
+	 * When a Flink job is submitted to YARN, the JobManager's host and the number of available
+	 * processing slots is written into a properties file, so that the Flink client is able
+	 * to pick those details up.
+	 * This configuration parameter allows changing the default location of that file (for example
+	 * for environments sharing a Flink installation between users)
+	 */
+	public static final String YARN_PROPERTIES_FILE_LOCATION = "yarn.properties-file.location";
+
 
 	// ------------------------ Hadoop Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
index c2e897f..3f78898 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -19,23 +19,48 @@
 package org.apache.flink.runtime.yarn;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 
+/**
+ * Abstract class for interacting with a running Flink cluster within YARN.
+ */
 public abstract class AbstractFlinkYarnCluster {
 
+	/**
+	 * Get hostname and port of the JobManager.
+	 */
 	public abstract InetSocketAddress getJobManagerAddress();
 
+	/**
+	 * Returns an URL (as a string) to the JobManager web interface, running next to the
+	 * ApplicationMaster and JobManager in a YARN container
+	 */
 	public abstract String getWebInterfaceURL();
 
+	/**
+	 * Request the YARN cluster to shut down.
+	 *
+	 * @param failApplication If true, the application will be marked as failed in YARN
+	 */
 	public abstract void shutdown(boolean failApplication);
 
+	/**
+	 * Boolean indicating whether the cluster has been stopped already
+	 */
 	public abstract boolean hasBeenStopped();
 
+	/**
+	 * Returns the latest cluster status, with number of Taskmanagers and slots
+	 */
 	public abstract FlinkYarnClusterStatus getClusterStatus();
 
+	/**
+	 * Boolean indicating whether the Flink YARN cluster is in an erronous state.
+	 */
 	public abstract boolean hasFailed();
 
 	/**
@@ -43,10 +68,24 @@ public abstract class AbstractFlinkYarnCluster {
 	 */
 	public abstract String getDiagnostics();
 
+	/**
+	 * May return new messages from the cluster.
+	 * Messages can be for example about failed containers or container launch requests.
+	 */
 	public abstract List<String> getNewMessages();
 
+	/**
+	 * Returns a string representation of the ApplicationID assigned by YARN.
+	 */
 	public abstract String getApplicationId();
 
+	/**
+	 * Flink's YARN cluster abstraction has two modes for connecting to the YARN AM.
+	 * In the detached mode, the AM is launched and the Flink YARN client is disconnecting
+	 * afterwards.
+	 * In the non-detached mode, it maintains a connection with the AM to control the cluster.
+	 * @return boolean indicating whether the cluster is a detached cluster
+	 */
 	public abstract boolean isDetached();
 
 	/**
@@ -69,7 +108,13 @@ public abstract class AbstractFlinkYarnCluster {
 	 * Tells the ApplicationMaster to monitor the status of JobId and stop itself once the specified
 	 * job has finished.
 	 *
-	 * @param jobID
+	 * @param jobID Id of the job
 	 */
 	public abstract void stopAfterJob(JobID jobID);
+
+	/**
+	 * Return the Flink configuration object
+	 * @return The Flink configuration object
+	 */
+	public abstract Configuration getFlinkConfiguration();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
index 2aaaaa0..7eb3e1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.yarn;
 import java.io.Serializable;
 
 
+/**
+ * Simple status representation of a running YARN cluster.
+ * It contains the number of available Taskmanagers and processing slots.
+ */
 public class FlinkYarnClusterStatus implements Serializable {
+	private static final long serialVersionUID = 4230348124179245370L;
 	private int numberOfTaskManagers;
 	private int numberOfSlots;
 
@@ -62,11 +67,8 @@ public class FlinkYarnClusterStatus implements Serializable {
 		if (numberOfSlots != that.numberOfSlots) {
 			return false;
 		}
-		if (numberOfTaskManagers != that.numberOfTaskManagers) {
-			return false;
-		}
+		return numberOfTaskManagers == that.numberOfTaskManagers;
 
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8cf8736a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 5fa3ac7..56be198 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -220,9 +220,8 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 
 	// -------------------------- Interaction with the cluster ------------------------
 
-	/**
+	/*
 	 * This call blocks until the message has been recevied.
-	 * @param jobID
 	 */
 	@Override
 	public void stopAfterJob(JobID jobID) {
@@ -235,6 +234,11 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	}
 
 	@Override
+	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+		return flinkConfig;
+	}
+
+	@Override
 	public InetSocketAddress getJobManagerAddress() {
 		return jobManagerAddress;
 	}
@@ -265,10 +269,10 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	@Override
 	public FlinkYarnClusterStatus getClusterStatus() {
 		if(!isConnected) {
-			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
+			throw new IllegalStateException("The cluster is not connected to the ApplicationMaster.");
 		}
 		if(hasBeenStopped()) {
-			throw new RuntimeException("The FlinkYarnCluster has alread been stopped");
+			throw new RuntimeException("The FlinkYarnCluster has already been stopped");
 		}
 		Future<Object> clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout);
 		Object clusterStatus;


[4/5] flink git commit: [FLINK-2556] [core] Refactor/fix pre-flight key validation

Posted by se...@apache.org.
[FLINK-2556] [core] Refactor/fix pre-flight key validation

This closes #1044


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e38d6fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e38d6fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e38d6fa

Branch: refs/heads/master
Commit: 1e38d6fad410c0a0ef82fa73617f6feda86cd0e0
Parents: bc63ef2
Author: zentol <s....@web.de>
Authored: Sun Aug 23 15:57:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 20:04:57 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/operators/DistinctOperator.java   |  7 -------
 .../org/apache/flink/api/java/operators/Keys.java    | 15 +++++++++++++++
 .../flink/api/java/sca/UdfAnalyzerExamplesTest.java  |  4 ++--
 3 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index a6eb43e..ad2335b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
@@ -26,9 +25,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
@@ -54,10 +51,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 		this.distinctLocationName = distinctLocationName;
 
-		if (!(input.getType() instanceof CompositeType) &&
-				!(input.getType() instanceof AtomicType && input.getType().isKeyType())){
-			throw new InvalidProgramException("Distinct only possible on composite or atomic key types.");
-		}
 		// if keys is null distinction is done on all fields
 		if (keys == null) {
 			keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());

http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 09874e5..47c66f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -209,6 +209,9 @@ public abstract class Keys<T> {
 				throw new InvalidProgramException("Specifying keys via field positions is only valid " +
 						"for tuple data types. Type: " + type);
 			}
+			if (type.getArity() == 0) {
+				throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
+			}
 
 			if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) {
 				throw new IllegalArgumentException("The grouping fields must not be empty.");
@@ -240,6 +243,9 @@ public abstract class Keys<T> {
 					}
 					else {
 						// arrived at key position
+						if (!fieldType.isKeyType()) {
+							throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key.");
+						}
 						if(fieldType instanceof CompositeType) {
 							// add all nested fields of composite type
 							((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
@@ -296,6 +302,15 @@ public abstract class Keys<T> {
 				keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
 				for (int i = 0; i < expressions.length; i++) {
 					List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
+					for (FlatFieldDescriptor key : keys) {
+						TypeInformation<?> keyType = key.getType();
+						if (!keyType.isKeyType()) {
+							throw new InvalidProgramException("This type (" + key.getType() + ") cannot be used as key.");
+						}
+						if (!(keyType instanceof AtomicType || keyType instanceof CompositeType)) {
+							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType);
+						}
+					}
 					if(keys.size() == 0) {
 						throw new InvalidProgramException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index a1d2b97..5254b68 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -529,8 +529,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testLogisticRegressionExamplesSumGradient() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
-				"Tuple1<double[]>",
-				"Tuple1<double[]>",
+				"Tuple1<double>",
+				"Tuple1<double>",
 				new String[] { "0" });
 	}
 


[5/5] flink git commit: [FLINK-2565] Support primitive arrays as keys

Posted by se...@apache.org.
[FLINK-2565] Support primitive arrays as keys

This closes #1043


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0807eec0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0807eec0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0807eec0

Branch: refs/heads/master
Commit: 0807eec0cb1acf8052a77b6133387e25399fce08
Parents: 1e38d6f
Author: zentol <s....@web.de>
Authored: Sun Aug 23 15:36:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 20:17:10 2015 +0200

----------------------------------------------------------------------
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  47 +++++--
 .../array/BooleanPrimitiveArrayComparator.java  |  56 +++++++++
 .../array/BytePrimitiveArrayComparator.java     |  56 +++++++++
 .../array/CharPrimitiveArrayComparator.java     |  56 +++++++++
 .../array/DoublePrimitiveArrayComparator.java   |  57 +++++++++
 .../array/FloatPrimitiveArrayComparator.java    |  56 +++++++++
 .../base/array/IntPrimitiveArrayComparator.java |  56 +++++++++
 .../array/LongPrimitiveArrayComparator.java     |  56 +++++++++
 .../base/array/PrimitiveArrayComparator.java    | 121 +++++++++++++++++++
 .../array/ShortPrimitiveArrayComparator.java    |  56 +++++++++
 .../BooleanPrimitiveArrayComparatorTest.java    |  45 +++++++
 .../array/BytePrimitiveArrayComparatorTest.java |  44 +++++++
 .../array/CharPrimitiveArrayComparatorTest.java |  42 +++++++
 .../DoublePrimitiveArrayComparatorTest.java     |  44 +++++++
 .../FloatPrimitiveArrayComparatorTest.java      |  44 +++++++
 .../array/IntPrimitiveArrayComparatorTest.java  |  44 +++++++
 .../array/LongPrimitiveArrayComparatorTest.java |  44 +++++++
 .../array/PrimitiveArrayComparatorTestBase.java |  41 +++++++
 .../ShortPrimitiveArrayComparatorTest.java      |  44 +++++++
 .../flink/api/java/operator/GroupingTest.java   |  14 ++-
 .../javaApiOperators/GroupReduceITCase.java     |  31 +++++
 .../util/CollectionDataSets.java                |  18 +++
 22 files changed, 1057 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 83126ab..3843f28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -24,13 +24,22 @@ import java.util.Map;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator;
+import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
 
 /**
@@ -39,19 +48,18 @@ import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySeria
  *
  * @param <T> The type represented by this type information, e.g., int[], double[], long[]
  */
-public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
-	
+public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
+
 	private static final long serialVersionUID = 1L;
 
-	public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE);
-	
+	public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE, BooleanPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE, BytePrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE, ShortPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE, IntPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE, LongPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE, FloatPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE, DoublePrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE, CharPrimitiveArrayComparator.class);
 	// --------------------------------------------------------------------------------------------
 	
 	/** The class of the array (such as int[].class) */
@@ -60,12 +68,15 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 	/** The serializer for the array */
 	private final TypeSerializer<T> serializer;
 
+	/** The class of the comparator for the array */
+	private Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass;
+
 	/**
 	 * Creates a new type info for a 
 	 * @param arrayClass The class of the array (such as int[].class)
 	 * @param serializer The serializer for the array.
 	 */
-	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer) {
+	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
 		if (arrayClass == null || serializer == null) {
 			throw new NullPointerException();
 		}
@@ -74,6 +85,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 		}
 		this.arrayClass = arrayClass;
 		this.serializer = serializer;
+		this.comparatorClass = comparatorClass;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -105,7 +117,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 
 	@Override
 	public boolean isKeyType() {
-		return false;
+		return true;
 	}
 
 	@Override
@@ -161,4 +173,13 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 		TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
 		TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
 	}
+
+	@Override
+	public PrimitiveArrayComparator<T, ?> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+		try {
+			return comparatorClass.getConstructor(boolean.class).newInstance(sortOrderAscending);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not initialize primitive " + comparatorClass.getName() + " array comparator.", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
new file mode 100644
index 0000000..b7487b8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanComparator;
+
+public class BooleanPrimitiveArrayComparator extends PrimitiveArrayComparator<boolean[], BooleanComparator> {
+	public BooleanPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new BooleanComparator(ascending));
+	}
+
+	@Override
+	public int hash(boolean[] record) {
+		int result = 0;
+		for (boolean field : record) {
+			result += field ? 1231 : 1237;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(boolean[] first, boolean[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = (second[x] == first[x] ? 0 : (first[x] ? 1 : -1));
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<boolean[]> duplicate() {
+		BooleanPrimitiveArrayComparator dupe = new BooleanPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
new file mode 100644
index 0000000..d914c3e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.ByteComparator;
+
+public class BytePrimitiveArrayComparator extends PrimitiveArrayComparator<byte[], ByteComparator> {
+	public BytePrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new ByteComparator(ascending));
+	}
+
+	@Override
+	public int hash(byte[] record) {
+		int result = 0;
+		for (byte field : record) {
+			result += (int) field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(byte[] first, byte[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<byte[]> duplicate() {
+		BytePrimitiveArrayComparator dupe = new BytePrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
new file mode 100644
index 0000000..d734152
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.CharComparator;
+
+public class CharPrimitiveArrayComparator extends PrimitiveArrayComparator<char[], CharComparator> {
+	public CharPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new CharComparator(ascending));
+	}
+
+	@Override
+	public int hash(char[] record) {
+		int result = 0;
+		for (char field : record) {
+			result += (int) field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(char[] first, char[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<char[]> duplicate() {
+		CharPrimitiveArrayComparator dupe = new CharPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
new file mode 100644
index 0000000..5153fa5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+
+public class DoublePrimitiveArrayComparator extends PrimitiveArrayComparator<double[], DoubleComparator> {
+	public DoublePrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new DoubleComparator(ascending));
+	}
+
+	@Override
+	public int hash(double[] record) {
+		int result = 0;
+		for (double field : record) {
+			long bits = Double.doubleToLongBits(field);
+			result += (int) (bits ^ (bits >>> 32));
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(double[] first, double[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = Double.compare(first[x], second[x]);
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<double[]> duplicate() {
+		DoublePrimitiveArrayComparator dupe = new DoublePrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
new file mode 100644
index 0000000..5a5986e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.FloatComparator;
+
+public class FloatPrimitiveArrayComparator extends PrimitiveArrayComparator<float[], FloatComparator> {
+	public FloatPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new FloatComparator(ascending));
+	}
+
+	@Override
+	public int hash(float[] record) {
+		int result = 0;
+		for (float field : record) {
+			result += Float.floatToIntBits(field);
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(float[] first, float[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = Float.compare(first[x], second[x]);
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<float[]> duplicate() {
+		FloatPrimitiveArrayComparator dupe = new FloatPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
new file mode 100644
index 0000000..78cb2ae
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+
+public class IntPrimitiveArrayComparator extends PrimitiveArrayComparator<int[], IntComparator> {
+	public IntPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new IntComparator(ascending));
+	}
+
+	@Override
+	public int hash(int[] record) {
+		int result = 0;
+		for (int field : record) {
+			result += field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(int[] first, int[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<int[]> duplicate() {
+		IntPrimitiveArrayComparator dupe = new IntPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
new file mode 100644
index 0000000..c0a69bc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+
+public class LongPrimitiveArrayComparator extends PrimitiveArrayComparator<long[], LongComparator> {
+	public LongPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new LongComparator(ascending));
+	}
+
+	@Override
+	public int hash(long[] record) {
+		int result = 0;
+		for (long field : record) {
+			result += (int) (field ^ (field >>> 32));
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(long[] first, long[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] < second[x] ? -1 : (first[x] == second[x] ? 0 : 1);
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<long[]> duplicate() {
+		LongPrimitiveArrayComparator dupe = new LongPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
new file mode 100644
index 0000000..ba53aff
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import java.io.IOException;
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BasicTypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+public abstract class PrimitiveArrayComparator<T, C extends BasicTypeComparator> extends TypeComparator<T> {
+	// For use by getComparators
+	@SuppressWarnings("rawtypes")
+	private final TypeComparator[] comparators = new TypeComparator[]{this};
+
+	protected final boolean ascending;
+	protected transient T reference;
+	protected final C comparator;
+
+	public PrimitiveArrayComparator(boolean ascending, C comparator) {
+		this.ascending = ascending;
+		this.comparator = comparator;
+	}
+
+	@Override
+	public void setReference(T toCompare) {
+		this.reference = toCompare;
+	}
+
+	@Override
+	public boolean equalToReference(T candidate) {
+		return compare(this.reference, candidate) == 0;
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		return compare(((PrimitiveArrayComparator<T, C>) referencedComparator).reference, this.reference);
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+		for (int x = 0; x < min(firstCount, secondCount); x++) {
+			int cmp = comparator.compareSerialized(firstSource, secondSource);
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+		int cmp = firstCount - secondCount;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator[] getFlatComparators() {
+		return comparators;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return false;
+	}
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return 0;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascending;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
new file mode 100644
index 0000000..5943694
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.ShortComparator;
+
+public class ShortPrimitiveArrayComparator extends PrimitiveArrayComparator<short[], ShortComparator> {
+	public ShortPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new ShortComparator(ascending));
+	}
+
+	@Override
+	public int hash(short[] record) {
+		int result = 0;
+		for (short field : record) {
+			result += (int) field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(short[] first, short[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<short[]> duplicate() {
+		ShortPrimitiveArrayComparator dupe = new ShortPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..4db71bf
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class BooleanPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<boolean[]> {
+	public BooleanPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, boolean[] should, boolean[] is) {
+		Assert.assertTrue(should.length == is.length);
+		for(int x=0; x< should.length; x++) {
+			Assert.assertEquals(should[x], is[x]);
+		}
+	}
+
+	@Override
+	protected boolean[][] getSortedTestData() {
+		return new boolean[][]{
+			new boolean[]{false, false},
+			new boolean[]{false, true},
+			new boolean[]{false, true, true},
+			new boolean[]{true},
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..4c57702
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class BytePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<byte[]> {
+	public BytePrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, byte[] should, byte[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected byte[][] getSortedTestData() {
+		return new byte[][]{
+			new byte[]{-1, 0},
+			new byte[]{0, -1},
+			new byte[]{0, 0},
+			new byte[]{0, 1},
+			new byte[]{0, 1, 2},
+			new byte[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..b318168
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class CharPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<char[]> {
+	public CharPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, char[] should, char[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected char[][] getSortedTestData() {
+		return new char[][]{
+			new char[]{0, 0},
+			new char[]{0, 1},
+			new char[]{0, 1, 2},
+			new char[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..b5d7e1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class DoublePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<double[]> {
+	public DoublePrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, double[] should, double[] is) {
+		Assert.assertArrayEquals(message, should, is, 0.00001);
+	}
+
+	@Override
+	protected double[][] getSortedTestData() {
+		return new double[][]{
+			new double[]{-1, 0},
+			new double[]{0, -1},
+			new double[]{0, 0},
+			new double[]{0, 1},
+			new double[]{0, 1, 2},
+			new double[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..830049e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class FloatPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<float[]> {
+	public FloatPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, float[] should, float[] is) {
+		Assert.assertArrayEquals(message, should, is, (float) 0.00001);
+	}
+
+	@Override
+	protected float[][] getSortedTestData() {
+		return new float[][]{
+			new float[]{-1, 0},
+			new float[]{0, -1},
+			new float[]{0, 0},
+			new float[]{0, 1},
+			new float[]{0, 1, 2},
+			new float[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..6c05f23
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class IntPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<int[]> {
+	public IntPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, int[] should, int[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected int[][] getSortedTestData() {
+		return new int[][]{
+			new int[]{-1, 0},
+			new int[]{0, -1},
+			new int[]{0, 0},
+			new int[]{0, 1},
+			new int[]{0, 1, 2},
+			new int[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..0ae573e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class LongPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<long[]> {
+	public LongPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, long[] should, long[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected long[][] getSortedTestData() {
+		return new long[][]{
+			new long[]{-1, 0},
+			new long[]{0, -1},
+			new long[]{0, 0},
+			new long[]{0, 1},
+			new long[]{0, 1, 2},
+			new long[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
new file mode 100644
index 0000000..ff620dd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class PrimitiveArrayComparatorTestBase<T> extends ComparatorTestBase<T> {
+	private PrimitiveArrayTypeInfo<T> info;
+
+	public PrimitiveArrayComparatorTestBase(PrimitiveArrayTypeInfo<T> info) {
+		this.info = info;
+	}
+
+	@Override
+	protected TypeComparator<T> createComparator(boolean ascending) {
+		return info.createComparator(ascending, null).duplicate();
+	}
+
+	@Override
+	protected TypeSerializer<T> createSerializer() {
+		return info.createSerializer(null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..5b48dc2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class ShortPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<short[]> {
+	public ShortPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, short[] should, short[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected short[][] getSortedTestData() {
+		return new short[][]{
+			new short[]{-1, 0},
+			new short[]{0, -1},
+			new short[]{0, 0},
+			new short[]{0, 1},
+			new short[]{0, 1, 2},
+			new short[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index b3922b3..bdad3db 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -67,8 +67,9 @@ public class GroupingTest {
 
 	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
 			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
-
 	
+	private final List<Tuple2<byte[], byte[]>> byteArrayData = new ArrayList<Tuple2<byte[], byte[]>>();
+
 	@Test  
 	public void testGroupByKeyFields1() {
 		
@@ -127,6 +128,15 @@ public class GroupingTest {
 	}
 
 	@Test
+	public void testGroupByKeyFieldsOnPrimitiveArray() {
+		this.byteArrayData.add(new Tuple2(new byte[]{0}, new byte[]{1}));
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<byte[], byte[]>> tupleDs = env.fromCollection(byteArrayData);
+		tupleDs.groupBy(0);
+	}
+
+	@Test
 	public void testGroupByKeyExpressions1() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -613,7 +623,7 @@ public class GroupingTest {
 	public static class CustomType2 implements Serializable {
 
 		public int myInt;
-		public int[] myIntArray;
+		public Integer[] myIntArray;
 
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 260de1c..95a8cb0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -62,6 +62,37 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testCorrectnessofGroupReduceOnTupleContainingPrimitiveByteArrayWithKeyFieldSelectors() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<byte[], Integer>> ds = CollectionDataSets.getTuple2WithByteArrayDataSet(env);
+		DataSet<Integer> reduceDs = ds.
+				groupBy(0).reduceGroup(new ByteArrayGroupReduce());
+
+		List<Integer> result = reduceDs.collect();
+
+		String expected = "0\n"
+				+ "1\n"
+				+ "2\n"
+				+ "3\n"
+				+ "4\n";
+
+		compareResultAsText(result, expected);
+
+	}
+
+	public static class ByteArrayGroupReduce implements GroupReduceFunction<Tuple2<byte[], Integer>, Integer> {
+		@Override
+		public void reduce(Iterable<Tuple2<byte[], Integer>> values, Collector<Integer> out) throws Exception {
+			int sum = 0;
+			for (Tuple2<byte[], Integer> value : values) {
+				sum += value.f1;
+			}
+			out.collect(sum);
+		}
+	}
+
+	@Test
 	public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception{
 		/*
 		 * check correctness of groupReduce on tuples with key field selector

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 1faf4c1..9fb275f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -205,6 +206,23 @@ public class CollectionDataSets {
 
 		return env.fromCollection(data, type);
 	}
+	
+	public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
+		List<Tuple2<byte[], Integer>> data = new ArrayList<Tuple2<byte[], Integer>>();
+		data.add(new Tuple2<byte[], Integer>(new byte[]{0, 4}, 1));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0}, 1));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0, 4}, 4));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 1}, 3));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{0}, 0));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0}, 1));
+				
+		TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<Tuple2<byte[], Integer>>(
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO
+		);
+		
+		return env.fromCollection(data, type);
+	}
 
 	public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {