You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/23 12:01:21 UTC

[1/5] flink git commit: [hotfix] Minor code cleanup in StreamTask

Repository: flink
Updated Branches:
  refs/heads/master 345b2529a -> 568845a3c


[hotfix] Minor code cleanup in StreamTask


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ee048bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ee048bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ee048bc

Branch: refs/heads/master
Commit: 4ee048bc0d50a715ea4247408ac16d41cbdba77d
Parents: 345b252
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 22 14:41:13 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 22 14:42:12 2016 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 52 ++++++--------------
 1 file changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ee048bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d4638a4..49bbee7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +67,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -135,9 +135,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	/** The configuration of this streaming task */
 	private StreamConfig configuration;
 
-	/** The class loader used to load dynamic classes of a job */
-	private ClassLoader userClassLoader;
-
 	/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
 	private AbstractStateBackend stateBackend;
 
@@ -180,9 +177,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	/** Thread pool for async snapshot workers */
 	private ExecutorService asyncOperationsThreadPool;
 
-	/** Timeout to await the termination of the thread pool in milliseconds */
-	private long threadPoolTerminationTimeout = 0L;
-
 	// ------------------------------------------------------------------------
 	//  Life cycle methods for specific implementations
 	// ------------------------------------------------------------------------
@@ -227,8 +221,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 			asyncOperationsThreadPool = Executors.newCachedThreadPool();
 
-			userClassLoader = getUserCodeClassLoader();
-
 			configuration = new StreamConfig(getTaskConfiguration());
 
 			stateBackend = createStateBackend();
@@ -248,7 +240,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				timerService = DefaultTimeServiceProvider.create(executor);
 			}
 
-			headOperator = configuration.getStreamOperator(userClassLoader);
+			headOperator = configuration.getStreamOperator(getUserCodeClassLoader());
 			operatorChain = new OperatorChain<>(this, headOperator, 
 						getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
 
@@ -446,10 +438,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		if (!asyncOperationsThreadPool.isShutdown()) {
 			asyncOperationsThreadPool.shutdownNow();
 		}
-
-		if(threadPoolTerminationTimeout > 0L) {
-			asyncOperationsThreadPool.awaitTermination(threadPoolTerminationTimeout, TimeUnit.MILLISECONDS);
-		}
 	}
 
 	/**
@@ -581,17 +569,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					if (state == null) {
 						continue;
 					}
-					if (state != null) {
-						StreamOperator<?> operator = allOperators[i];
-
-						if (operator != null) {
-							LOG.debug("Restore state of task {} in chain ({}).", i, getName());
-							FSDataInputStream inputStream = state.openInputStream();
-							try {
-								operator.restoreState(inputStream);
-							} finally {
-								inputStream.close();
-							}
+					StreamOperator<?> operator = allOperators[i];
+
+					if (operator != null) {
+						LOG.debug("Restore state of task {} in chain ({}).", i, getName());
+						try (FSDataInputStream inputStream = state.openInputStream()) {
+							operator.restoreState(inputStream);
 						}
 					}
 				}
@@ -803,7 +786,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					getEnvironment().getTaskKvStateRegistry());
 		}
 
-		return (KeyedStateBackend<K>) keyedStateBackend;
+		@SuppressWarnings("unchecked")
+		KeyedStateBackend<K> typedBackend = (KeyedStateBackend<K>) keyedStateBackend;
+		return typedBackend;
 	}
 
 	/**
@@ -812,7 +797,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	 * checkpoint stream factory to write write-ahead logs. <b>This should not be used for
 	 * anything else.</b>
 	 */
-	public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator operator) throws IOException {
+	public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator) throws IOException {
 		return stateBackend.createStreamFactory(
 				getEnvironment().getJobID(),
 				createOperatorIdentifier(
@@ -821,7 +806,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	}
 
-	private String createOperatorIdentifier(StreamOperator operator, int vertexId) {
+	private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
 		return operator.getClass().getSimpleName() +
 				"_" + vertexId +
 				"_" + getEnvironment().getTaskInfo().getIndexOfThisSubtask();
@@ -877,15 +862,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		};
 	}
 
-	/**
-	 * Sets a timeout for the async thread pool. Default should always be 0 to avoid blocking restarts of task.
-	 *
-	 * @param threadPoolTerminationTimeout timeout for the async thread pool in milliseconds
-	 */
-	public void setThreadPoolTerminationTimeout(long threadPoolTerminationTimeout) {
-		this.threadPoolTerminationTimeout = threadPoolTerminationTimeout;
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -969,7 +945,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 					KeyGroupsStateHandle keyGroupsStateHandle = this.keyGroupsStateHandleFuture.get();
 					if (keyGroupsStateHandle != null) {
-						keyedStates = Arrays.asList(keyGroupsStateHandle);
+						keyedStates = Collections.singletonList(keyGroupsStateHandle);
 					}
 				}
 


[2/5] flink git commit: [FLINK-4556] [distributed runtime] Make Queryable State Key-Group Aware

Posted by se...@apache.org.
[FLINK-4556] [distributed runtime] Make Queryable State Key-Group Aware

This closes #2523


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/181b5451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/181b5451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/181b5451

Branch: refs/heads/master
Commit: 181b54515ed2701a25d4a71fa7ed52394b2aeb66
Parents: 4ee048b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Sep 20 16:36:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:24:42 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/query/KvStateLocation.java    | 47 ++++++-----
 .../runtime/query/KvStateLocationRegistry.java  | 16 ++--
 .../flink/runtime/query/KvStateMessage.java     | 35 ++++----
 .../flink/runtime/query/KvStateRegistry.java    | 12 +--
 .../runtime/query/KvStateRegistryListener.java  |  9 ++-
 .../runtime/query/QueryableStateClient.java     |  4 +-
 .../runtime/query/TaskKvStateRegistry.java      | 17 ++--
 .../runtime/state/KeyGroupRangeAssignment.java  | 15 +++-
 .../flink/runtime/state/KeyedStateBackend.java  |  8 +-
 .../ActorGatewayKvStateRegistryListener.java    |  9 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +-
 .../runtime/jobmanager/JobManagerTest.java      | 20 ++---
 .../query/KvStateLocationRegistryTest.java      | 35 ++++----
 .../runtime/query/KvStateLocationTest.java      | 85 ++++++++++++++------
 .../runtime/query/QueryableStateClientTest.java | 10 +--
 .../runtime/query/netty/KvStateClientTest.java  |  2 +-
 .../query/netty/KvStateServerHandlerTest.java   | 10 +--
 .../runtime/state/StateBackendTestBase.java     |  9 ++-
 .../flink/test/query/QueryableStateITCase.java  |  1 -
 19 files changed, 207 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 9be22c2..90bb2a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -160,44 +161,50 @@ public class KvStateLocation implements Serializable {
 	/**
 	 * Registers a KvState instance for the given key group index.
 	 *
-	 * @param keyGroupIndex  Key group index to register
+	 * @param keyGroupRange  Key group range to register
 	 * @param kvStateId      ID of the KvState instance at the key group index.
 	 * @param kvStateAddress Server address of the KvState instance at the key group index.
-	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
 	 */
-	void registerKvState(int keyGroupIndex, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
-		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+	void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
+
+		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
 			throw new IndexOutOfBoundsException("Key group index");
 		}
 
-		if (kvStateIds[keyGroupIndex] == null && kvStateAddresses[keyGroupIndex] == null) {
-			numRegisteredKeyGroups++;
-		}
+		for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) {
+
+			if (kvStateIds[kgIdx] == null && kvStateAddresses[kgIdx] == null) {
+				numRegisteredKeyGroups++;
+			}
 
-		kvStateIds[keyGroupIndex] = kvStateId;
-		kvStateAddresses[keyGroupIndex] = kvStateAddress;
+			kvStateIds[kgIdx] = kvStateId;
+			kvStateAddresses[kgIdx] = kvStateAddress;
+		}
 	}
 
 	/**
 	 * Registers a KvState instance for the given key group index.
 	 *
-	 * @param keyGroupIndex Key group index to unregister.
-	 * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
-	 * @throws IllegalArgumentException If no location information registered for key group index.
+	 * @param keyGroupRange Key group range to unregister.
+	 * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
+	 * @throws IllegalArgumentException  If no location information registered for a key group index in the range.
 	 */
-	void unregisterKvState(int keyGroupIndex) {
-		if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+	void unregisterKvState(KeyGroupRange keyGroupRange) {
+		if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
 			throw new IndexOutOfBoundsException("Key group index");
 		}
 
-		if (kvStateIds[keyGroupIndex] == null || kvStateAddresses[keyGroupIndex] == null) {
-			throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
-		}
+		for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) {
+			if (kvStateIds[kgIdx] == null || kvStateAddresses[kgIdx] == null) {
+				throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
+			}
 
-		numRegisteredKeyGroups--;
+			numRegisteredKeyGroups--;
 
-		kvStateIds[keyGroupIndex] = null;
-		kvStateAddresses[keyGroupIndex] = null;
+			kvStateIds[kgIdx] = null;
+			kvStateAddresses[kgIdx] = null;
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
index 5b76598..c489025 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -73,7 +74,7 @@ public class KvStateLocationRegistry {
 	 * Notifies the registry about a registered KvState instance.
 	 *
 	 * @param jobVertexId JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex Key group index the KvState instance belongs to
+	 * @param keyGroupRange Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState has been registered
 	 * @param kvStateId ID of the registered KvState instance
 	 * @param kvStateServerAddress Server address where to find the KvState instance
@@ -85,7 +86,7 @@ public class KvStateLocationRegistry {
 	 */
 	public void notifyKvStateRegistered(
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvStateID kvStateId,
 			KvStateServerAddress kvStateServerAddress) {
@@ -97,7 +98,7 @@ public class KvStateLocationRegistry {
 			ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
 
 			if (vertex != null) {
-				int parallelism = vertex.getParallelism();
+				int parallelism = vertex.getMaxParallelism();
 				location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
 				lookupTable.put(registrationName, location);
 			} else {
@@ -119,22 +120,21 @@ public class KvStateLocationRegistry {
 
 			throw duplicate;
 		}
-
-		location.registerKvState(keyGroupIndex, kvStateId, kvStateServerAddress);
+		location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
 	}
 
 	/**
 	 * Notifies the registry about an unregistered KvState instance.
 	 *
 	 * @param jobVertexId JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex Key group index the KvState instance belongs to
+	 * @param keyGroupRange Key group index the KvState instance belongs to
 	 * @param registrationName Name under which the KvState has been registered
 	 * @throws IllegalArgumentException If another operator registered the state instance
 	 * @throws IllegalArgumentException If the registration name is not known
 	 */
 	public void notifyKvStateUnregistered(
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName) {
 
 		KvStateLocation location = lookupTable.get(registrationName);
@@ -147,7 +147,7 @@ public class KvStateLocationRegistry {
 						"under '" + registrationName + "'.");
 			}
 
-			location.unregisterKvState(keyGroupIndex);
+			location.unregisterKvState(keyGroupRange);
 
 			if (location.getNumRegisteredKeyGroups() == 0) {
 				lookupTable.remove(registrationName);

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
index 5e3c38e..857b8b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -97,8 +98,8 @@ public interface KvStateMessage extends Serializable {
 		/** JobVertexID the KvState instance belongs to. */
 		private final JobVertexID jobVertexId;
 
-		/** Key group index the KvState instance belongs to. */
-		private final int keyGroupIndex;
+		/** Key group range the KvState instance belongs to. */
+		private final KeyGroupRange keyGroupRange;
 
 		/** Name under which the KvState has been registered. */
 		private final String registrationName;
@@ -114,7 +115,7 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @param jobId                JobID the KvState instance belongs to
 		 * @param jobVertexId          JobVertexID the KvState instance belongs to
-		 * @param keyGroupIndex        Key group index the KvState instance belongs to
+		 * @param keyGroupRange        Key group range the KvState instance belongs to
 		 * @param registrationName     Name under which the KvState has been registered
 		 * @param kvStateId            ID of the registered KvState instance
 		 * @param kvStateServerAddress Server address where to find the KvState instance
@@ -122,15 +123,15 @@ public interface KvStateMessage extends Serializable {
 		public NotifyKvStateRegistered(
 				JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName,
 				KvStateID kvStateId,
 				KvStateServerAddress kvStateServerAddress) {
 
 			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
 			this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
-			Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
-			this.keyGroupIndex = keyGroupIndex;
+			Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP);
+			this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 			this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
 			this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
 			this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
@@ -159,8 +160,8 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @return Key group index the KvState instance belongs to
 		 */
-		public int getKeyGroupIndex() {
-			return keyGroupIndex;
+		public KeyGroupRange getKeyGroupRange() {
+			return keyGroupRange;
 		}
 
 		/**
@@ -195,7 +196,7 @@ public interface KvStateMessage extends Serializable {
 			return "NotifyKvStateRegistered{" +
 					"jobId=" + jobId +
 					", jobVertexId=" + jobVertexId +
-					", keyGroupIndex=" + keyGroupIndex +
+					", keyGroupRange=" + keyGroupRange +
 					", registrationName='" + registrationName + '\'' +
 					", kvStateId=" + kvStateId +
 					", kvStateServerAddress=" + kvStateServerAddress +
@@ -214,7 +215,7 @@ public interface KvStateMessage extends Serializable {
 		private final JobVertexID jobVertexId;
 
 		/** Key group index the KvState instance belongs to. */
-		private final int keyGroupIndex;
+		private final KeyGroupRange keyGroupRange;
 
 		/** Name under which the KvState has been registered. */
 		private final String registrationName;
@@ -224,19 +225,19 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @param jobId                JobID the KvState instance belongs to
 		 * @param jobVertexId          JobVertexID the KvState instance belongs to
-		 * @param keyGroupIndex        Key group index the KvState instance belongs to
+		 * @param keyGroupRange        Key group range the KvState instance belongs to
 		 * @param registrationName     Name under which the KvState has been registered
 		 */
 		public NotifyKvStateUnregistered(
 				JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName) {
 
 			this.jobId = Preconditions.checkNotNull(jobId, "JobID");
 			this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
-			Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
-			this.keyGroupIndex = keyGroupIndex;
+			Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP);
+			this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 			this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
 		}
 
@@ -263,8 +264,8 @@ public interface KvStateMessage extends Serializable {
 		 *
 		 * @return Key group index the KvState instance belongs to
 		 */
-		public int getKeyGroupIndex() {
-			return keyGroupIndex;
+		public KeyGroupRange getKeyGroupRange() {
+			return keyGroupRange;
 		}
 
 		/**
@@ -281,7 +282,7 @@ public interface KvStateMessage extends Serializable {
 			return "NotifyKvStateUnregistered{" +
 					"jobId=" + jobId +
 					", jobVertexId=" + jobVertexId +
-					", keyGroupIndex=" + keyGroupIndex +
+					", keyGroupRange=" + keyGroupRange +
 					", registrationName='" + registrationName + '\'' +
 					'}';
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index f19c123..f57ae47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.query;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.Task;
 
@@ -80,7 +81,7 @@ public class KvStateRegistry {
 	 *
 	 * @param jobId            JobId the KvState instance belongs to
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState is registered
 	 * @param kvState          KvState instance to be registered
 	 * @return Assigned KvStateID
@@ -88,7 +89,7 @@ public class KvStateRegistry {
 	public KvStateID registerKvState(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvState<?> kvState) {
 
@@ -100,7 +101,7 @@ public class KvStateRegistry {
 				listener.notifyKvStateRegistered(
 						jobId,
 						jobVertexId,
-						keyGroupIndex,
+						keyGroupRange,
 						registrationName,
 						kvStateId);
 			}
@@ -116,11 +117,12 @@ public class KvStateRegistry {
 	 *
 	 * @param jobId     JobId the KvState instance belongs to
 	 * @param kvStateId KvStateID to identify the KvState instance
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 */
 	public void unregisterKvState(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvStateID kvStateId) {
 
@@ -130,7 +132,7 @@ public class KvStateRegistry {
 				listener.notifyKvStateUnregistered(
 						jobId,
 						jobVertexId,
-						keyGroupIndex,
+						keyGroupRange,
 						registrationName);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
index 760adf1..29bee9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 
 /**
  * A listener for a {@link KvStateRegistry}.
@@ -34,14 +35,14 @@ public interface KvStateRegistryListener {
 	 *
 	 * @param jobId            Job ID the KvState instance belongs to
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState is registered
 	 * @param kvStateId        ID of the KvState instance
 	 */
 	void notifyKvStateRegistered(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName,
 			KvStateID kvStateId);
 
@@ -50,13 +51,13 @@ public interface KvStateRegistryListener {
 	 *
 	 * @param jobId            Job ID the KvState instance belongs to
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to
-	 * @param keyGroupIndex    Key group index the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName Name under which the KvState is registered
 	 */
 	void notifyKvStateUnregistered(
 			JobID jobId,
 			JobVertexID jobVertexId,
-			int keyGroupIndex,
+			KeyGroupRange keyGroupRange,
 			String registrationName);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 0e1ea57..591c67d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
 import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.query.netty.UnknownKvStateID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -296,7 +296,7 @@ public class QueryableStateClient {
 				.flatMap(new Mapper<KvStateLocation, Future<byte[]>>() {
 					@Override
 					public Future<byte[]> apply(KvStateLocation lookup) {
-						int keyGroupIndex = MathUtils.murmurHash(keyHashCode) % lookup.getNumKeyGroups();
+						int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups());
 
 						KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
 						if (serverAddress == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index b5c09aa..d831214 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 
@@ -52,15 +53,15 @@ public class TaskKvStateRegistry {
 	/**
 	 * Registers the KvState instance at the KvStateRegistry.
 	 *
-	 * @param keyGroupIndex    KeyGroupIndex the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
 	 * @param registrationName The registration name (not necessarily the same
 	 *                         as the KvState name defined in the state
 	 *                         descriptor used to create the KvState instance)
 	 * @param kvState          The
 	 */
-	public void registerKvState(int keyGroupIndex, String registrationName, KvState<?> kvState) {
-		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupIndex, registrationName, kvState);
-		registeredKvStates.add(new KvStateInfo(keyGroupIndex, registrationName, kvStateId));
+	public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, KvState<?> kvState) {
+		KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
+		registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
 	}
 
 	/**
@@ -68,7 +69,7 @@ public class TaskKvStateRegistry {
 	 */
 	public void unregisterAll() {
 		for (KvStateInfo kvState : registeredKvStates) {
-			registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupIndex, kvState.registrationName, kvState.kvStateId);
+			registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupRange, kvState.registrationName, kvState.kvStateId);
 		}
 	}
 
@@ -77,14 +78,14 @@ public class TaskKvStateRegistry {
 	 */
 	private static class KvStateInfo {
 
-		private final int keyGroupIndex;
+		private final KeyGroupRange keyGroupRange;
 
 		private final String registrationName;
 
 		private final KvStateID kvStateId;
 
-		public KvStateInfo(int keyGroupIndex, String registrationName, KvStateID kvStateId) {
-			this.keyGroupIndex = keyGroupIndex;
+		public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+			this.keyGroupRange = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index eceb6f4..894f721 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -48,8 +48,19 @@ public final class KeyGroupRangeAssignment {
 	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
 	 * @return the key-group to which the given key is assigned
 	 */
-	public static final int assignToKeyGroup(Object key, int maxParallelism) {
-		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+	public static int assignToKeyGroup(Object key, int maxParallelism) {
+		return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
+	}
+
+	/**
+	 * Assigns the given key to a key-group index.
+	 *
+	 * @param keyHash the hash of the key to assign
+	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+	 * @return the key-group to which the given key is assigned
+	 */
+	public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
+		return MathUtils.murmurHash(keyHash) % maxParallelism;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 8db63ee..5612f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -269,8 +269,7 @@ public abstract class KeyedStateBackend<K> {
 			}
 
 			String name = stateDescriptor.getQueryableStateName();
-			// TODO: deal with key group indices here
-			kvStateRegistry.registerKvState(0, name, kvState);
+			kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
 		}
 
 		return state;
@@ -336,4 +335,9 @@ public abstract class KeyedStateBackend<K> {
 			long checkpointId,
 			long timestamp,
 			CheckpointStreamFactory streamFactory) throws Exception;
+
+
+	public KeyGroupRange getKeyGroupRange() {
+		return keyGroupRange;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
index 2d69938..4404867 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -49,14 +50,14 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe
 	public void notifyKvStateRegistered(
 		JobID jobId,
 		JobVertexID jobVertexId,
-		int keyGroupIndex,
+		KeyGroupRange keyGroupRange,
 		String registrationName,
 		KvStateID kvStateId) {
 
 		Object msg = new KvStateMessage.NotifyKvStateRegistered(
 			jobId,
 			jobVertexId,
-			keyGroupIndex,
+			keyGroupRange,
 			registrationName,
 			kvStateId,
 			kvStateServerAddress);
@@ -68,13 +69,13 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe
 	public void notifyKvStateUnregistered(
 		JobID jobId,
 		JobVertexID jobVertexId,
-		int keyGroupIndex,
+		KeyGroupRange keyGroupRange,
 		String registrationName) {
 
 		Object msg = new KvStateMessage.NotifyKvStateUnregistered(
 			jobId,
 			jobVertexId,
-			keyGroupIndex,
+			keyGroupRange,
 			registrationName);
 
 		jobManager.tell(msg);

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 639c158..fd96f86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1500,7 +1500,7 @@ class JobManager(
 
               graph.getKvStateLocationRegistry.notifyKvStateRegistered(
                 msg.getJobVertexId,
-                msg.getKeyGroupIndex,
+                msg.getKeyGroupRange,
                 msg.getRegistrationName,
                 msg.getKvStateId,
                 msg.getKvStateServerAddress)
@@ -1519,7 +1519,7 @@ class JobManager(
             try {
               graph.getKvStateLocationRegistry.notifyKvStateUnregistered(
                 msg.getJobVertexId,
-                msg.getKeyGroupIndex,
+                msg.getKeyGroupRange,
                 msg.getRegistrationName)
             } catch {
               case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index f925d62..d731b95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
 import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -467,7 +468,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered registerNonExistingJob = new NotifyKvStateRegistered(
 				new JobID(),
 				new JobVertexID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"any-name",
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1233));
@@ -492,7 +493,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered registerForExistingJob = new NotifyKvStateRegistered(
 				jobGraph.getJobID(),
 				jobVertex1.getID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"register-me",
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
@@ -512,11 +513,12 @@ public class JobManagerTest {
 
 		assertEquals(jobGraph.getJobID(), location.getJobId());
 		assertEquals(jobVertex1.getID(), location.getJobVertexId());
-		assertEquals(jobVertex1.getParallelism(), location.getNumKeyGroups());
+		assertEquals(jobVertex1.getMaxParallelism(), location.getNumKeyGroups());
 		assertEquals(1, location.getNumRegisteredKeyGroups());
-		int keyGroupIndex = registerForExistingJob.getKeyGroupIndex();
-		assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupIndex));
-		assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupIndex));
+		KeyGroupRange keyGroupRange = registerForExistingJob.getKeyGroupRange();
+		assertEquals(1, keyGroupRange.getNumberOfKeyGroups());
+		assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupRange.getStartKeyGroup()));
+		assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
 
 		//
 		// Unregistration
@@ -524,7 +526,7 @@ public class JobManagerTest {
 		NotifyKvStateUnregistered unregister = new NotifyKvStateUnregistered(
 				registerForExistingJob.getJobId(),
 				registerForExistingJob.getJobVertexId(),
-				registerForExistingJob.getKeyGroupIndex(),
+				registerForExistingJob.getKeyGroupRange(),
 				registerForExistingJob.getRegistrationName());
 
 		jobManager.tell(unregister);
@@ -546,7 +548,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered register = new NotifyKvStateRegistered(
 				jobGraph.getJobID(),
 				jobVertex1.getID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"duplicate-me",
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
@@ -554,7 +556,7 @@ public class JobManagerTest {
 		NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
 				jobGraph.getJobID(),
 				jobVertex2.getID(), // <--- different operator, but...
-				0,
+				new KeyGroupRange(0, 0),
 				"duplicate-me", // ...same name
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 1293));

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
index 70f0ba2..f8005a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.query;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -52,8 +53,8 @@ public class KvStateLocationRegistryTest {
 		// IDs for each key group of each vertex
 		KvStateID[][] ids = new KvStateID[vertices.length][];
 		for (int i = 0; i < ids.length; i++) {
-			ids[i] = new KvStateID[vertices[i].getParallelism()];
-			for (int j = 0; j < vertices[i].getParallelism(); j++) {
+			ids[i] = new KvStateID[vertices[i].getMaxParallelism()];
+			for (int j = 0; j < vertices[i].getMaxParallelism(); j++) {
 				ids[i][j] = new KvStateID();
 			}
 		}
@@ -66,12 +67,12 @@ public class KvStateLocationRegistryTest {
 
 		// Register
 		for (int i = 0; i < vertices.length; i++) {
-			int numKeyGroups = vertices[i].getParallelism();
+			int numKeyGroups = vertices[i].getMaxParallelism();
 			for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
 				// Register
 				registry.notifyKvStateRegistered(
 						vertices[i].getJobVertexId(),
-						keyGroupIndex,
+						new KeyGroupRange(keyGroupIndex, keyGroupIndex),
 						registrationNames[i],
 						ids[i][keyGroupIndex],
 						server);
@@ -83,8 +84,8 @@ public class KvStateLocationRegistryTest {
 			KvStateLocation location = registry.getKvStateLocation(registrationNames[i]);
 			assertNotNull(location);
 
-			int parallelism = vertices[i].getParallelism();
-			for (int keyGroupIndex = 0; keyGroupIndex < parallelism; keyGroupIndex++) {
+			int maxParallelism = vertices[i].getMaxParallelism();
+			for (int keyGroupIndex = 0; keyGroupIndex < maxParallelism; keyGroupIndex++) {
 				assertEquals(ids[i][keyGroupIndex], location.getKvStateID(keyGroupIndex));
 				assertEquals(server, location.getKvStateServerAddress(keyGroupIndex));
 			}
@@ -92,10 +93,10 @@ public class KvStateLocationRegistryTest {
 
 		// Unregister
 		for (int i = 0; i < vertices.length; i++) {
-			int numKeyGroups = vertices[i].getParallelism();
+			int numKeyGroups = vertices[i].getMaxParallelism();
 			JobVertexID jobVertexId = vertices[i].getJobVertexId();
 			for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-				registry.notifyKvStateUnregistered(jobVertexId, keyGroupIndex, registrationNames[i]);
+				registry.notifyKvStateUnregistered(jobVertexId, new KeyGroupRange(keyGroupIndex, keyGroupIndex), registrationNames[i]);
 			}
 		}
 
@@ -121,7 +122,7 @@ public class KvStateLocationRegistryTest {
 		// First operator registers
 		registry.notifyKvStateRegistered(
 				vertices[0].getJobVertexId(),
-				0,
+				new KeyGroupRange(0, 0),
 				registrationName,
 				new KvStateID(),
 				new KvStateServerAddress(InetAddress.getLocalHost(), 12328));
@@ -130,7 +131,7 @@ public class KvStateLocationRegistryTest {
 			// Second operator registers same name
 			registry.notifyKvStateRegistered(
 					vertices[1].getJobVertexId(),
-					0,
+					new KeyGroupRange(0, 0),
 					registrationName,
 					new KvStateID(),
 					new KvStateServerAddress(InetAddress.getLocalHost(), 12032));
@@ -151,7 +152,7 @@ public class KvStateLocationRegistryTest {
 
 		KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
 		try {
-			registry.notifyKvStateUnregistered(vertex.getJobVertexId(), 0, "any-name");
+			registry.notifyKvStateUnregistered(vertex.getJobVertexId(), new KeyGroupRange(0, 0), "any-name");
 			fail("Did not throw expected Exception, because of missing registration");
 		} catch (IllegalArgumentException ignored) {
 			// Expected
@@ -179,7 +180,7 @@ public class KvStateLocationRegistryTest {
 		// First operator registers name
 		registry.notifyKvStateRegistered(
 				vertices[0].getJobVertexId(),
-				0,
+				new KeyGroupRange(0, 0),
 				name,
 				new KvStateID(),
 				mock(KvStateServerAddress.class));
@@ -190,7 +191,7 @@ public class KvStateLocationRegistryTest {
 
 			registry.notifyKvStateUnregistered(
 					vertices[0].getJobVertexId(),
-					notRegisteredKeyGroupIndex,
+					new KeyGroupRange(notRegisteredKeyGroupIndex, notRegisteredKeyGroupIndex),
 					name);
 
 			fail("Did not throw expected Exception");
@@ -201,7 +202,7 @@ public class KvStateLocationRegistryTest {
 			// Wrong operator tries to unregister
 			registry.notifyKvStateUnregistered(
 					vertices[1].getJobVertexId(),
-					0,
+					new KeyGroupRange(0, 0),
 					name);
 
 			fail("Did not throw expected Exception");
@@ -210,13 +211,13 @@ public class KvStateLocationRegistryTest {
 	}
 
 	// ------------------------------------------------------------------------
-	
-	private ExecutionJobVertex createJobVertex(int parallelism) {
+
+	private ExecutionJobVertex createJobVertex(int maxParallelism) {
 		JobVertexID id = new JobVertexID();
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 
 		when(vertex.getJobVertexId()).thenReturn(id);
-		when(vertex.getParallelism()).thenReturn(parallelism);
+		when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
 
 		return vertex;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
index 59ac575..ed51f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -36,55 +39,87 @@ public class KvStateLocationTest {
 		JobID jobId = new JobID();
 		JobVertexID jobVertexId = new JobVertexID();
 		int numKeyGroups = 123;
+		int numRanges = 10;
+		int fract = numKeyGroups / numRanges;
+		int remain = numKeyGroups % numRanges;
+		List<KeyGroupRange> keyGroupRanges = new ArrayList<>(numRanges);
+
+		int start = 0;
+		for (int i = 0; i < numRanges; ++i) {
+			int end = start + fract - 1;
+			if(remain > 0) {
+				--remain;
+				++end;
+			}
+ 			KeyGroupRange range = new KeyGroupRange(start, end);
+			keyGroupRanges.add(range);
+			start = end + 1;
+		}
+
+		System.out.println(keyGroupRanges);
+
 		String registrationName = "asdasdasdasd";
 
 		KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);
 
-		KvStateID[] kvStateIds = new KvStateID[numKeyGroups];
-		KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numKeyGroups];
+		KvStateID[] kvStateIds = new KvStateID[numRanges];
+		KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numRanges];
 
 		InetAddress host = InetAddress.getLocalHost();
 
 		// Register
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			kvStateIds[keyGroupIndex] = new KvStateID();
-			serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
-
-			location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
-			assertEquals(keyGroupIndex + 1, location.getNumRegisteredKeyGroups());
+		int registeredCount = 0;
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			kvStateIds[rangeIdx] = new KvStateID();
+			serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			location.registerKvState(keyGroupRange, kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
+			registeredCount += keyGroupRange.getNumberOfKeyGroups();
+			assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
 		}
 
 		// Lookup
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
-			assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+				assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup));
+				assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup));
+			}
 		}
 
 		// Overwrite
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			kvStateIds[keyGroupIndex] = new KvStateID();
-			serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			kvStateIds[rangeIdx] = new KvStateID();
+			serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
 
-			location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
-			assertEquals(numKeyGroups, location.getNumRegisteredKeyGroups());
+			location.registerKvState(keyGroupRanges.get(rangeIdx), kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
+			assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
 		}
 
 		// Lookup
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
-			assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+				assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup));
+				assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup));
+			}
 		}
 
 		// Unregister
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			location.unregisterKvState(keyGroupIndex);
-			assertEquals(numKeyGroups - keyGroupIndex - 1, location.getNumRegisteredKeyGroups());
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			location.unregisterKvState(keyGroupRange);
+			registeredCount -= keyGroupRange.getNumberOfKeyGroups();
+			assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
 		}
 
 		// Lookup
-		for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
-			assertEquals(null, location.getKvStateID(keyGroupIndex));
-			assertEquals(null, location.getKvStateServerAddress(keyGroupIndex));
+		for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+			KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+			for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+				assertEquals(null, location.getKvStateID(keyGroup));
+				assertEquals(null, location.getKvStateServerAddress(keyGroup));
+			}
 		}
 
 		assertEquals(0, location.getNumRegisteredKeyGroups());

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 405f962..1039568 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -154,7 +154,7 @@ public class QueryableStateClientTest {
 			KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
 			KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3);
 			for (int i = 0; i < numKeyGroups; i++) {
-				location.registerKvState(i, kvStateId, serverAddress);
+				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
 			}
 
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
@@ -184,7 +184,7 @@ public class QueryableStateClientTest {
 			serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
 			location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
 			for (int i = 0; i < numKeyGroups; i++) {
-				location.registerKvState(i, kvStateId, serverAddress);
+				location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
 			}
 
 			when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
@@ -281,7 +281,7 @@ public class QueryableStateClientTest {
 				kvStateIds[i] = registries[i].registerKvState(
 						jobId,
 						new JobVertexID(),
-						i, // key group index
+						new KeyGroupRange(i, i),
 						"choco",
 						kvState);
 			}
@@ -302,7 +302,7 @@ public class QueryableStateClientTest {
 			// Location lookup service
 			KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
 			for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
-				location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
+				location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
 			}
 
 			KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
@@ -385,7 +385,7 @@ public class QueryableStateClientTest {
 
 		// Exact contents don't matter here
 		KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
-		location.registerKvState(0, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
+		location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
 
 		JobID jobId1 = new JobID();
 		JobID jobId2 = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index f785174..c8fb4bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -595,7 +595,7 @@ public class KvStateClientTest {
 				KvState<?> kvState = (KvState<?>) state;
 
 				// Register KvState (one state instance for all server)
-				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), 0, "any", kvState);
+				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
 			}
 
 			final KvStateClient finalClient = client;

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 52c807f..7e6d713 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -278,7 +278,7 @@ public class KvStateServerHandlerTest {
 		KvStateID kvStateId = registry.registerKvState(
 				new JobID(),
 				new JobVertexID(),
-				0,
+				new KeyGroupRange(0, 0),
 				"vanilla",
 				kvState);
 
@@ -681,18 +681,18 @@ public class KvStateServerHandlerTest {
 	 */
 	static class TestRegistryListener implements KvStateRegistryListener {
 		volatile JobVertexID jobVertexID;
-		volatile int keyGroupIndex;
+		volatile KeyGroupRange keyGroupIndex;
 		volatile String registrationName;
 		volatile KvStateID kvStateId;
 
 		@Override
 		public void notifyKvStateRegistered(JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName,
 				KvStateID kvStateId) {
 			this.jobVertexID = jobVertexId;
-			this.keyGroupIndex = keyGroupIndex;
+			this.keyGroupIndex = keyGroupRange;
 			this.registrationName = registrationName;
 			this.kvStateId = kvStateId;
 		}
@@ -700,7 +700,7 @@ public class KvStateServerHandlerTest {
 		@Override
 		public void notifyKvStateUnregistered(JobID jobId,
 				JobVertexID jobVertexId,
-				int keyGroupIndex,
+				KeyGroupRange keyGroupRange,
 				String registrationName) {
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 33ec182..73e2808 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1108,6 +1108,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+		KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
 
 		KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
 		registry.registerListener(listener);
@@ -1122,7 +1123,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		// Verify registered
 		verify(listener, times(1)).notifyKvStateRegistered(
-				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
 		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
@@ -1130,7 +1131,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		backend.close();
 
 		verify(listener, times(1)).notifyKvStateUnregistered(
-				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"));
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"));
 		backend.close();
 		// Initialize again
 		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
@@ -1140,12 +1141,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 		// Verify registered again
 		verify(listener, times(2)).notifyKvStateRegistered(
-				eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+				eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 		backend.close();
 
 	}
-	
+
 	@Test
 	public void testEmptyStateCheckpointing() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index b99858a..1259460 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -93,7 +93,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@Ignore
 public class QueryableStateITCase extends TestLogger {
 
 	private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);


[3/5] flink git commit: [FLINK-4663] [jdbc] Fix JDBCOutputFormat log message

Posted by se...@apache.org.
[FLINK-4663] [jdbc] Fix JDBCOutputFormat log message

This closes #2534


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/335a2826
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/335a2826
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/335a2826

Branch: refs/heads/master
Commit: 335a2826481725aabc86de8068909060894a274b
Parents: 181b545
Author: swapnil-chougule <sw...@pubmatic.com>
Authored: Thu Sep 22 18:01:20 2016 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:26:02 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/335a2826/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 5464a94..da4b1ad 100644
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -108,7 +108,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	@Override
 	public void writeRecord(Row row) throws IOException {
 
-		if (typesArray != null && typesArray.length > 0 && typesArray.length == row.productArity()) {
+		if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
 			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
 		} 
 		try {


[5/5] flink git commit: [FLINK-4666] [core] Declare constants as final in ParameterTool

Posted by se...@apache.org.
[FLINK-4666] [core] Declare constants as final in ParameterTool

This closes #2538


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/568845a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/568845a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/568845a3

Branch: refs/heads/master
Commit: 568845a3cfea1f6fe71a99f1f546cbd4d947fb33
Parents: 37c9512
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Thu Sep 22 22:54:29 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:34:42 2016 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/api/java/utils/ParameterTool.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/568845a3/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 1c03c08..8f504e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -45,8 +45,8 @@ import java.util.Properties;
 public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
 	private static final long serialVersionUID = 1L;
 
-	protected static String NO_VALUE_KEY = "__NO_VALUE_KEY";
-	protected static String DEFAULT_UNDEFINED = "<undefined>";
+	protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
+	protected static final String DEFAULT_UNDEFINED = "<undefined>";
 
 
 	// ------------------ Constructors ------------------------


[4/5] flink git commit: [FLINK-4665] [misc] Remove unnecessary boxing/unboxing of primitive types in various places

Posted by se...@apache.org.
[FLINK-4665] [misc] Remove unnecessary boxing/unboxing of primitive types in various places

This closes #2537


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37c9512f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37c9512f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37c9512f

Branch: refs/heads/master
Commit: 37c9512fadb2b2ad6d3167b97d25fd00a2fa7620
Parents: 335a282
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Thu Sep 22 20:27:08 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:33:18 2016 +0200

----------------------------------------------------------------------
 .../api/java/sca/NestedMethodAnalyzer.java      |  2 +-
 .../aggregation/FloatSummaryAggregator.java     |  2 +-
 .../flink/api/java/utils/ParameterTool.java     | 16 +++++++---------
 .../flink/api/java/io/CsvInputFormatTest.java   | 20 ++++++++++----------
 .../api/java/sca/UdfAnalyzerExamplesTest.java   |  2 +-
 .../flink/api/java/sca/UdfAnalyzerTest.java     |  4 ++--
 6 files changed, 22 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
index 302df4e..7304c50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
@@ -607,7 +607,7 @@ public class NestedMethodAnalyzer extends BasicInterpreter {
 						else {
 							final int constant = tagged(values.get(2)).getIntConstant();
 
-							if (constant < 0 || Integer.valueOf(methodOwner.split("Tuple")[1]) <= constant ) {
+							if (constant < 0 || Integer.parseInt(methodOwner.split("Tuple")[1]) <= constant ) {
 								analyzer.handleInvalidTupleAccess();
 							}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
index 745ca41..bc78841 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
@@ -89,7 +89,7 @@ public class FloatSummaryAggregator extends NumericSummaryAggregator<Float> {
 		@Override
 		public Float result() {
 			// overflow will go to infinity
-			return new Double(sum.value()).floatValue();
+			return (float) sum.value();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index a9389a5..1c03c08 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -19,8 +19,8 @@ package org.apache.flink.api.java.utils;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
@@ -275,7 +275,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	public int getInt(String key) {
 		addToDefaults(key, null);
 		String value = getRequired(key);
-		return Integer.valueOf(value);
+		return Integer.parseInt(value);
 	}
 
 	/**
@@ -285,11 +285,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	public int getInt(String key, int defaultValue) {
 		addToDefaults(key, Integer.toString(defaultValue));
 		String value = get(key);
-		if(value == null) {
+		if (value == null) {
 			return defaultValue;
-		} else {
-			return Integer.valueOf(value);
 		}
+		return Integer.parseInt(value);
 	}
 
 	// -------------- LONG
@@ -301,7 +300,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	public long getLong(String key) {
 		addToDefaults(key, null);
 		String value = getRequired(key);
-		return Long.valueOf(value);
+		return Long.parseLong(value);
 	}
 
 	/**
@@ -311,11 +310,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	public long getLong(String key, long defaultValue) {
 		addToDefaults(key, Long.toString(defaultValue));
 		String value = get(key);
-		if(value == null) {
+		if (value == null) {
 			return defaultValue;
-		} else {
-			return Long.valueOf(value);
 		}
+		return Long.parseLong(value);
 	}
 
 	// -------------- FLOAT

http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index fa091d9..54f226c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -116,19 +116,19 @@ public class CsvInputFormatTest {
 					if (recordCounter == 1) {
 						assertNotNull(result);
 						assertEquals("this is", result.f0);
-						assertEquals(new Integer(1), result.f1);
+						assertEquals(Integer.valueOf(1), result.f1);
 						assertEquals(new Double(2.0), result.f2);
 						assertEquals((long) format.getCurrentState(), 15);
 					} else if (recordCounter == 2) {
 						assertNotNull(result);
 						assertEquals("a test", result.f0);
-						assertEquals(new Integer(3), result.f1);
+						assertEquals(Integer.valueOf(3), result.f1);
 						assertEquals(new Double(4.0), result.f2);
 						assertEquals((long) format.getCurrentState(), 29);
 					} else if (recordCounter == 3) {
 						assertNotNull(result);
 						assertEquals("#next", result.f0);
-						assertEquals(new Integer(5), result.f1);
+						assertEquals(Integer.valueOf(5), result.f1);
 						assertEquals(new Double(6.0), result.f2);
 						assertEquals((long) format.getCurrentState(), 42);
 					} else {
@@ -196,21 +196,21 @@ public class CsvInputFormatTest {
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("this is", result.f0);
-			assertEquals(new Integer(1), result.f1);
+			assertEquals(Integer.valueOf(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
 			assertEquals((long) format.getCurrentState(), 65);
 
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
-			assertEquals(new Integer(3), result.f1);
+			assertEquals(Integer.valueOf(3), result.f1);
 			assertEquals(new Double(4.0), result.f2);
 			assertEquals((long) format.getCurrentState(), 91);
 
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("#next", result.f0);
-			assertEquals(new Integer(5), result.f1);
+			assertEquals(Integer.valueOf(5), result.f1);
 			assertEquals(new Double(6.0), result.f2);
 			assertEquals((long) format.getCurrentState(), 104);
 
@@ -248,13 +248,13 @@ public class CsvInputFormatTest {
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("this is", result.f0);
-			assertEquals(new Integer(1), result.f1);
+			assertEquals(Integer.valueOf(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
 			
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
-			assertEquals(new Integer(3), result.f1);
+			assertEquals(Integer.valueOf(3), result.f1);
 			assertEquals(new Double(4.0), result.f2);
 
 			result = format.nextRecord(result);
@@ -292,13 +292,13 @@ public class CsvInputFormatTest {
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("this is", result.f0);
-			assertEquals(new Integer(1), result.f1);
+			assertEquals(Integer.valueOf(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
 			
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
-			assertEquals(new Integer(3), result.f1);
+			assertEquals(Integer.valueOf(3), result.f1);
 			assertEquals(new Double(4.0), result.f2);
 			
 			result = format.nextRecord(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index 11ab5f2..01dc070 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -546,7 +546,7 @@ public class UdfAnalyzerExamplesTest {
 			for (int i = 0; i < split.length; i++) {
 
 				if (i == 42 - 1) {
-					p.setLabel(new Integer(split[i].trim().substring(0, 1)));
+					p.setLabel(Integer.valueOf(split[i].trim().substring(0, 1)));
 				} else {
 					if (a < 42 && !split[i].trim().isEmpty()) {
 						features[a++] = Double.parseDouble(split[i].trim());

http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index 20ce261..ac35793 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,7 +36,6 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -700,7 +700,7 @@ public class UdfAnalyzerTest {
 		@SuppressWarnings("unchecked")
 		@Override
 		public Tuple1<Tuple1<String>> map(Tuple1<Tuple1<String>> value) throws Exception {
-			((Tuple1<String>) value.getField(Integer.valueOf("2."))).f0 = "Hello";
+			((Tuple1<String>) value.getField(Integer.parseInt("2."))).f0 = "Hello";
 			return value;
 		}
 	}