You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/23 12:01:21 UTC
[1/5] flink git commit: [hotfix] Minor code cleanup in StreamTask
Repository: flink
Updated Branches:
refs/heads/master 345b2529a -> 568845a3c
[hotfix] Minor code cleanup in StreamTask
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ee048bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ee048bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ee048bc
Branch: refs/heads/master
Commit: 4ee048bc0d50a715ea4247408ac16d41cbdba77d
Parents: 345b252
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 22 14:41:13 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 22 14:42:12 2016 +0200
----------------------------------------------------------------------
.../streaming/runtime/tasks/StreamTask.java | 52 ++++++--------------
1 file changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4ee048bc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d4638a4..49bbee7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +67,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -135,9 +135,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
/** The configuration of this streaming task */
private StreamConfig configuration;
- /** The class loader used to load dynamic classes of a job */
- private ClassLoader userClassLoader;
-
/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
private AbstractStateBackend stateBackend;
@@ -180,9 +177,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
/** Thread pool for async snapshot workers */
private ExecutorService asyncOperationsThreadPool;
- /** Timeout to await the termination of the thread pool in milliseconds */
- private long threadPoolTerminationTimeout = 0L;
-
// ------------------------------------------------------------------------
// Life cycle methods for specific implementations
// ------------------------------------------------------------------------
@@ -227,8 +221,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
asyncOperationsThreadPool = Executors.newCachedThreadPool();
- userClassLoader = getUserCodeClassLoader();
-
configuration = new StreamConfig(getTaskConfiguration());
stateBackend = createStateBackend();
@@ -248,7 +240,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
timerService = DefaultTimeServiceProvider.create(executor);
}
- headOperator = configuration.getStreamOperator(userClassLoader);
+ headOperator = configuration.getStreamOperator(getUserCodeClassLoader());
operatorChain = new OperatorChain<>(this, headOperator,
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -446,10 +438,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
if (!asyncOperationsThreadPool.isShutdown()) {
asyncOperationsThreadPool.shutdownNow();
}
-
- if(threadPoolTerminationTimeout > 0L) {
- asyncOperationsThreadPool.awaitTermination(threadPoolTerminationTimeout, TimeUnit.MILLISECONDS);
- }
}
/**
@@ -581,17 +569,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
if (state == null) {
continue;
}
- if (state != null) {
- StreamOperator<?> operator = allOperators[i];
-
- if (operator != null) {
- LOG.debug("Restore state of task {} in chain ({}).", i, getName());
- FSDataInputStream inputStream = state.openInputStream();
- try {
- operator.restoreState(inputStream);
- } finally {
- inputStream.close();
- }
+ StreamOperator<?> operator = allOperators[i];
+
+ if (operator != null) {
+ LOG.debug("Restore state of task {} in chain ({}).", i, getName());
+ try (FSDataInputStream inputStream = state.openInputStream()) {
+ operator.restoreState(inputStream);
}
}
}
@@ -803,7 +786,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
getEnvironment().getTaskKvStateRegistry());
}
- return (KeyedStateBackend<K>) keyedStateBackend;
+ @SuppressWarnings("unchecked")
+ KeyedStateBackend<K> typedBackend = (KeyedStateBackend<K>) keyedStateBackend;
+ return typedBackend;
}
/**
@@ -812,7 +797,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
* checkpoint stream factory to write write-ahead logs. <b>This should not be used for
* anything else.</b>
*/
- public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator operator) throws IOException {
+ public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> operator) throws IOException {
return stateBackend.createStreamFactory(
getEnvironment().getJobID(),
createOperatorIdentifier(
@@ -821,7 +806,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
- private String createOperatorIdentifier(StreamOperator operator, int vertexId) {
+ private String createOperatorIdentifier(StreamOperator<?> operator, int vertexId) {
return operator.getClass().getSimpleName() +
"_" + vertexId +
"_" + getEnvironment().getTaskInfo().getIndexOfThisSubtask();
@@ -877,15 +862,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
};
}
- /**
- * Sets a timeout for the async thread pool. Default should always be 0 to avoid blocking restarts of task.
- *
- * @param threadPoolTerminationTimeout timeout for the async thread pool in milliseconds
- */
- public void setThreadPoolTerminationTimeout(long threadPoolTerminationTimeout) {
- this.threadPoolTerminationTimeout = threadPoolTerminationTimeout;
- }
-
// ------------------------------------------------------------------------
/**
@@ -969,7 +945,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
KeyGroupsStateHandle keyGroupsStateHandle = this.keyGroupsStateHandleFuture.get();
if (keyGroupsStateHandle != null) {
- keyedStates = Arrays.asList(keyGroupsStateHandle);
+ keyedStates = Collections.singletonList(keyGroupsStateHandle);
}
}
[2/5] flink git commit: [FLINK-4556] [distributed runtime] Make
Queryable State Key-Group Aware
Posted by se...@apache.org.
[FLINK-4556] [distributed runtime] Make Queryable State Key-Group Aware
This closes #2523
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/181b5451
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/181b5451
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/181b5451
Branch: refs/heads/master
Commit: 181b54515ed2701a25d4a71fa7ed52394b2aeb66
Parents: 4ee048b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Sep 20 16:36:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:24:42 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/query/KvStateLocation.java | 47 ++++++-----
.../runtime/query/KvStateLocationRegistry.java | 16 ++--
.../flink/runtime/query/KvStateMessage.java | 35 ++++----
.../flink/runtime/query/KvStateRegistry.java | 12 +--
.../runtime/query/KvStateRegistryListener.java | 9 ++-
.../runtime/query/QueryableStateClient.java | 4 +-
.../runtime/query/TaskKvStateRegistry.java | 17 ++--
.../runtime/state/KeyGroupRangeAssignment.java | 15 +++-
.../flink/runtime/state/KeyedStateBackend.java | 8 +-
.../ActorGatewayKvStateRegistryListener.java | 9 ++-
.../flink/runtime/jobmanager/JobManager.scala | 4 +-
.../runtime/jobmanager/JobManagerTest.java | 20 ++---
.../query/KvStateLocationRegistryTest.java | 35 ++++----
.../runtime/query/KvStateLocationTest.java | 85 ++++++++++++++------
.../runtime/query/QueryableStateClientTest.java | 10 +--
.../runtime/query/netty/KvStateClientTest.java | 2 +-
.../query/netty/KvStateServerHandlerTest.java | 10 +--
.../runtime/state/StateBackendTestBase.java | 9 ++-
.../flink/test/query/QueryableStateITCase.java | 1 -
19 files changed, 207 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 9be22c2..90bb2a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
@@ -160,44 +161,50 @@ public class KvStateLocation implements Serializable {
/**
* Registers a KvState instance for the given key group index.
*
- * @param keyGroupIndex Key group index to register
+ * @param keyGroupRange Key group range to register
* @param kvStateId ID of the KvState instance at the key group index.
* @param kvStateAddress Server address of the KvState instance at the key group index.
- * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
+ * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
*/
- void registerKvState(int keyGroupIndex, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
- if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+ void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) {
+
+ if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
throw new IndexOutOfBoundsException("Key group index");
}
- if (kvStateIds[keyGroupIndex] == null && kvStateAddresses[keyGroupIndex] == null) {
- numRegisteredKeyGroups++;
- }
+ for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) {
+
+ if (kvStateIds[kgIdx] == null && kvStateAddresses[kgIdx] == null) {
+ numRegisteredKeyGroups++;
+ }
- kvStateIds[keyGroupIndex] = kvStateId;
- kvStateAddresses[keyGroupIndex] = kvStateAddress;
+ kvStateIds[kgIdx] = kvStateId;
+ kvStateAddresses[kgIdx] = kvStateAddress;
+ }
}
/**
* Registers a KvState instance for the given key group index.
*
- * @param keyGroupIndex Key group index to unregister.
- * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups
- * @throws IllegalArgumentException If no location information registered for key group index.
+ * @param keyGroupRange Key group range to unregister.
+ * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups
+ * @throws IllegalArgumentException If no location information registered for a key group index in the range.
*/
- void unregisterKvState(int keyGroupIndex) {
- if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
+ void unregisterKvState(KeyGroupRange keyGroupRange) {
+ if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
throw new IndexOutOfBoundsException("Key group index");
}
- if (kvStateIds[keyGroupIndex] == null || kvStateAddresses[keyGroupIndex] == null) {
- throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
- }
+ for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) {
+ if (kvStateIds[kgIdx] == null || kvStateAddresses[kgIdx] == null) {
+ throw new IllegalArgumentException("Not registered. Probably registration/unregistration race.");
+ }
- numRegisteredKeyGroups--;
+ numRegisteredKeyGroups--;
- kvStateIds[keyGroupIndex] = null;
- kvStateAddresses[keyGroupIndex] = null;
+ kvStateIds[kgIdx] = null;
+ kvStateAddresses[kgIdx] = null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
index 5b76598..c489025 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
@@ -73,7 +74,7 @@ public class KvStateLocationRegistry {
* Notifies the registry about a registered KvState instance.
*
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @param kvStateId ID of the registered KvState instance
* @param kvStateServerAddress Server address where to find the KvState instance
@@ -85,7 +86,7 @@ public class KvStateLocationRegistry {
*/
public void notifyKvStateRegistered(
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
KvStateServerAddress kvStateServerAddress) {
@@ -97,7 +98,7 @@ public class KvStateLocationRegistry {
ExecutionJobVertex vertex = jobVertices.get(jobVertexId);
if (vertex != null) {
- int parallelism = vertex.getParallelism();
+ int parallelism = vertex.getMaxParallelism();
location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName);
lookupTable.put(registrationName, location);
} else {
@@ -119,22 +120,21 @@ public class KvStateLocationRegistry {
throw duplicate;
}
-
- location.registerKvState(keyGroupIndex, kvStateId, kvStateServerAddress);
+ location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress);
}
/**
* Notifies the registry about an unregistered KvState instance.
*
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group index the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @throws IllegalArgumentException If another operator registered the state instance
* @throws IllegalArgumentException If the registration name is not known
*/
public void notifyKvStateUnregistered(
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName) {
KvStateLocation location = lookupTable.get(registrationName);
@@ -147,7 +147,7 @@ public class KvStateLocationRegistry {
"under '" + registrationName + "'.");
}
- location.unregisterKvState(keyGroupIndex);
+ location.unregisterKvState(keyGroupRange);
if (location.getNumRegisteredKeyGroups() == 0) {
lookupTable.remove(registrationName);
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
index 5e3c38e..857b8b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
@@ -97,8 +98,8 @@ public interface KvStateMessage extends Serializable {
/** JobVertexID the KvState instance belongs to. */
private final JobVertexID jobVertexId;
- /** Key group index the KvState instance belongs to. */
- private final int keyGroupIndex;
+ /** Key group range the KvState instance belongs to. */
+ private final KeyGroupRange keyGroupRange;
/** Name under which the KvState has been registered. */
private final String registrationName;
@@ -114,7 +115,7 @@ public interface KvStateMessage extends Serializable {
*
* @param jobId JobID the KvState instance belongs to
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
* @param kvStateId ID of the registered KvState instance
* @param kvStateServerAddress Server address where to find the KvState instance
@@ -122,15 +123,15 @@ public interface KvStateMessage extends Serializable {
public NotifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
KvStateServerAddress kvStateServerAddress) {
this.jobId = Preconditions.checkNotNull(jobId, "JobID");
this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
- Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
- this.keyGroupIndex = keyGroupIndex;
+ Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP);
+ this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID");
this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
@@ -159,8 +160,8 @@ public interface KvStateMessage extends Serializable {
*
* @return Key group index the KvState instance belongs to
*/
- public int getKeyGroupIndex() {
- return keyGroupIndex;
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
}
/**
@@ -195,7 +196,7 @@ public interface KvStateMessage extends Serializable {
return "NotifyKvStateRegistered{" +
"jobId=" + jobId +
", jobVertexId=" + jobVertexId +
- ", keyGroupIndex=" + keyGroupIndex +
+ ", keyGroupRange=" + keyGroupRange +
", registrationName='" + registrationName + '\'' +
", kvStateId=" + kvStateId +
", kvStateServerAddress=" + kvStateServerAddress +
@@ -214,7 +215,7 @@ public interface KvStateMessage extends Serializable {
private final JobVertexID jobVertexId;
/** Key group index the KvState instance belongs to. */
- private final int keyGroupIndex;
+ private final KeyGroupRange keyGroupRange;
/** Name under which the KvState has been registered. */
private final String registrationName;
@@ -224,19 +225,19 @@ public interface KvStateMessage extends Serializable {
*
* @param jobId JobID the KvState instance belongs to
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState has been registered
*/
public NotifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName) {
this.jobId = Preconditions.checkNotNull(jobId, "JobID");
this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID");
- Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index");
- this.keyGroupIndex = keyGroupIndex;
+ Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP);
+ this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name");
}
@@ -263,8 +264,8 @@ public interface KvStateMessage extends Serializable {
*
* @return Key group index the KvState instance belongs to
*/
- public int getKeyGroupIndex() {
- return keyGroupIndex;
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
}
/**
@@ -281,7 +282,7 @@ public interface KvStateMessage extends Serializable {
return "NotifyKvStateUnregistered{" +
"jobId=" + jobId +
", jobVertexId=" + jobVertexId +
- ", keyGroupIndex=" + keyGroupIndex +
+ ", keyGroupRange=" + keyGroupRange +
", registrationName='" + registrationName + '\'' +
'}';
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index f19c123..f57ae47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.taskmanager.Task;
@@ -80,7 +81,7 @@ public class KvStateRegistry {
*
* @param jobId JobId the KvState instance belongs to
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState is registered
* @param kvState KvState instance to be registered
* @return Assigned KvStateID
@@ -88,7 +89,7 @@ public class KvStateRegistry {
public KvStateID registerKvState(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvState<?> kvState) {
@@ -100,7 +101,7 @@ public class KvStateRegistry {
listener.notifyKvStateRegistered(
jobId,
jobVertexId,
- keyGroupIndex,
+ keyGroupRange,
registrationName,
kvStateId);
}
@@ -116,11 +117,12 @@ public class KvStateRegistry {
*
* @param jobId JobId the KvState instance belongs to
* @param kvStateId KvStateID to identify the KvState instance
+ * @param keyGroupRange Key group range the KvState instance belongs to
*/
public void unregisterKvState(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId) {
@@ -130,7 +132,7 @@ public class KvStateRegistry {
listener.notifyKvStateUnregistered(
jobId,
jobVertexId,
- keyGroupIndex,
+ keyGroupRange,
registrationName);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
index 760adf1..29bee9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
/**
* A listener for a {@link KvStateRegistry}.
@@ -34,14 +35,14 @@ public interface KvStateRegistryListener {
*
* @param jobId Job ID the KvState instance belongs to
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState is registered
* @param kvStateId ID of the KvState instance
*/
void notifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId);
@@ -50,13 +51,13 @@ public interface KvStateRegistryListener {
*
* @param jobId Job ID the KvState instance belongs to
* @param jobVertexId JobVertexID the KvState instance belongs to
- * @param keyGroupIndex Key group index the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName Name under which the KvState is registered
*/
void notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 0e1ea57..591c67d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -36,8 +36,8 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -296,7 +296,7 @@ public class QueryableStateClient {
.flatMap(new Mapper<KvStateLocation, Future<byte[]>>() {
@Override
public Future<byte[]> apply(KvStateLocation lookup) {
- int keyGroupIndex = MathUtils.murmurHash(keyHashCode) % lookup.getNumKeyGroups();
+ int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups());
KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
if (serverAddress == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index b5c09aa..d831214 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
@@ -52,15 +53,15 @@ public class TaskKvStateRegistry {
/**
* Registers the KvState instance at the KvStateRegistry.
*
- * @param keyGroupIndex KeyGroupIndex the KvState instance belongs to
+ * @param keyGroupRange Key group range the KvState instance belongs to
* @param registrationName The registration name (not necessarily the same
* as the KvState name defined in the state
* descriptor used to create the KvState instance)
* @param kvState The
*/
- public void registerKvState(int keyGroupIndex, String registrationName, KvState<?> kvState) {
- KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupIndex, registrationName, kvState);
- registeredKvStates.add(new KvStateInfo(keyGroupIndex, registrationName, kvStateId));
+ public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, KvState<?> kvState) {
+ KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState);
+ registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId));
}
/**
@@ -68,7 +69,7 @@ public class TaskKvStateRegistry {
*/
public void unregisterAll() {
for (KvStateInfo kvState : registeredKvStates) {
- registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupIndex, kvState.registrationName, kvState.kvStateId);
+ registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupRange, kvState.registrationName, kvState.kvStateId);
}
}
@@ -77,14 +78,14 @@ public class TaskKvStateRegistry {
*/
private static class KvStateInfo {
- private final int keyGroupIndex;
+ private final KeyGroupRange keyGroupRange;
private final String registrationName;
private final KvStateID kvStateId;
- public KvStateInfo(int keyGroupIndex, String registrationName, KvStateID kvStateId) {
- this.keyGroupIndex = keyGroupIndex;
+ public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
+ this.keyGroupRange = keyGroupRange;
this.registrationName = registrationName;
this.kvStateId = kvStateId;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index eceb6f4..894f721 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -48,8 +48,19 @@ public final class KeyGroupRangeAssignment {
* @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
* @return the key-group to which the given key is assigned
*/
- public static final int assignToKeyGroup(Object key, int maxParallelism) {
- return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+ public static int assignToKeyGroup(Object key, int maxParallelism) {
+ return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
+ }
+
+ /**
+ * Assigns the given key to a key-group index.
+ *
+ * @param keyHash the hash of the key to assign
+ * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+ * @return the key-group to which the given key is assigned
+ */
+ public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
+ return MathUtils.murmurHash(keyHash) % maxParallelism;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 8db63ee..5612f73 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -269,8 +269,7 @@ public abstract class KeyedStateBackend<K> {
}
String name = stateDescriptor.getQueryableStateName();
- // TODO: deal with key group indices here
- kvStateRegistry.registerKvState(0, name, kvState);
+ kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
}
return state;
@@ -336,4 +335,9 @@ public abstract class KeyedStateBackend<K> {
long checkpointId,
long timestamp,
CheckpointStreamFactory streamFactory) throws Exception;
+
+
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
index 2d69938..4404867 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.Preconditions;
/**
@@ -49,14 +50,14 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe
public void notifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId) {
Object msg = new KvStateMessage.NotifyKvStateRegistered(
jobId,
jobVertexId,
- keyGroupIndex,
+ keyGroupRange,
registrationName,
kvStateId,
kvStateServerAddress);
@@ -68,13 +69,13 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe
public void notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName) {
Object msg = new KvStateMessage.NotifyKvStateUnregistered(
jobId,
jobVertexId,
- keyGroupIndex,
+ keyGroupRange,
registrationName);
jobManager.tell(msg);
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 639c158..fd96f86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1500,7 +1500,7 @@ class JobManager(
graph.getKvStateLocationRegistry.notifyKvStateRegistered(
msg.getJobVertexId,
- msg.getKeyGroupIndex,
+ msg.getKeyGroupRange,
msg.getRegistrationName,
msg.getKvStateId,
msg.getKvStateServerAddress)
@@ -1519,7 +1519,7 @@ class JobManager(
try {
graph.getKvStateLocationRegistry.notifyKvStateUnregistered(
msg.getJobVertexId,
- msg.getKeyGroupIndex,
+ msg.getKeyGroupRange,
msg.getRegistrationName)
} catch {
case t: Throwable =>
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index f925d62..d731b95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -467,7 +468,7 @@ public class JobManagerTest {
NotifyKvStateRegistered registerNonExistingJob = new NotifyKvStateRegistered(
new JobID(),
new JobVertexID(),
- 0,
+ new KeyGroupRange(0, 0),
"any-name",
new KvStateID(),
new KvStateServerAddress(InetAddress.getLocalHost(), 1233));
@@ -492,7 +493,7 @@ public class JobManagerTest {
NotifyKvStateRegistered registerForExistingJob = new NotifyKvStateRegistered(
jobGraph.getJobID(),
jobVertex1.getID(),
- 0,
+ new KeyGroupRange(0, 0),
"register-me",
new KvStateID(),
new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
@@ -512,11 +513,12 @@ public class JobManagerTest {
assertEquals(jobGraph.getJobID(), location.getJobId());
assertEquals(jobVertex1.getID(), location.getJobVertexId());
- assertEquals(jobVertex1.getParallelism(), location.getNumKeyGroups());
+ assertEquals(jobVertex1.getMaxParallelism(), location.getNumKeyGroups());
assertEquals(1, location.getNumRegisteredKeyGroups());
- int keyGroupIndex = registerForExistingJob.getKeyGroupIndex();
- assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupIndex));
- assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupIndex));
+ KeyGroupRange keyGroupRange = registerForExistingJob.getKeyGroupRange();
+ assertEquals(1, keyGroupRange.getNumberOfKeyGroups());
+ assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupRange.getStartKeyGroup()));
+ assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
//
// Unregistration
@@ -524,7 +526,7 @@ public class JobManagerTest {
NotifyKvStateUnregistered unregister = new NotifyKvStateUnregistered(
registerForExistingJob.getJobId(),
registerForExistingJob.getJobVertexId(),
- registerForExistingJob.getKeyGroupIndex(),
+ registerForExistingJob.getKeyGroupRange(),
registerForExistingJob.getRegistrationName());
jobManager.tell(unregister);
@@ -546,7 +548,7 @@ public class JobManagerTest {
NotifyKvStateRegistered register = new NotifyKvStateRegistered(
jobGraph.getJobID(),
jobVertex1.getID(),
- 0,
+ new KeyGroupRange(0, 0),
"duplicate-me",
new KvStateID(),
new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
@@ -554,7 +556,7 @@ public class JobManagerTest {
NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
jobGraph.getJobID(),
jobVertex2.getID(), // <--- different operator, but...
- 0,
+ new KeyGroupRange(0, 0),
"duplicate-me", // ...same name
new KvStateID(),
new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
index 70f0ba2..f8005a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.junit.Test;
import java.net.InetAddress;
@@ -52,8 +53,8 @@ public class KvStateLocationRegistryTest {
// IDs for each key group of each vertex
KvStateID[][] ids = new KvStateID[vertices.length][];
for (int i = 0; i < ids.length; i++) {
- ids[i] = new KvStateID[vertices[i].getParallelism()];
- for (int j = 0; j < vertices[i].getParallelism(); j++) {
+ ids[i] = new KvStateID[vertices[i].getMaxParallelism()];
+ for (int j = 0; j < vertices[i].getMaxParallelism(); j++) {
ids[i][j] = new KvStateID();
}
}
@@ -66,12 +67,12 @@ public class KvStateLocationRegistryTest {
// Register
for (int i = 0; i < vertices.length; i++) {
- int numKeyGroups = vertices[i].getParallelism();
+ int numKeyGroups = vertices[i].getMaxParallelism();
for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
// Register
registry.notifyKvStateRegistered(
vertices[i].getJobVertexId(),
- keyGroupIndex,
+ new KeyGroupRange(keyGroupIndex, keyGroupIndex),
registrationNames[i],
ids[i][keyGroupIndex],
server);
@@ -83,8 +84,8 @@ public class KvStateLocationRegistryTest {
KvStateLocation location = registry.getKvStateLocation(registrationNames[i]);
assertNotNull(location);
- int parallelism = vertices[i].getParallelism();
- for (int keyGroupIndex = 0; keyGroupIndex < parallelism; keyGroupIndex++) {
+ int maxParallelism = vertices[i].getMaxParallelism();
+ for (int keyGroupIndex = 0; keyGroupIndex < maxParallelism; keyGroupIndex++) {
assertEquals(ids[i][keyGroupIndex], location.getKvStateID(keyGroupIndex));
assertEquals(server, location.getKvStateServerAddress(keyGroupIndex));
}
@@ -92,10 +93,10 @@ public class KvStateLocationRegistryTest {
// Unregister
for (int i = 0; i < vertices.length; i++) {
- int numKeyGroups = vertices[i].getParallelism();
+ int numKeyGroups = vertices[i].getMaxParallelism();
JobVertexID jobVertexId = vertices[i].getJobVertexId();
for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- registry.notifyKvStateUnregistered(jobVertexId, keyGroupIndex, registrationNames[i]);
+ registry.notifyKvStateUnregistered(jobVertexId, new KeyGroupRange(keyGroupIndex, keyGroupIndex), registrationNames[i]);
}
}
@@ -121,7 +122,7 @@ public class KvStateLocationRegistryTest {
// First operator registers
registry.notifyKvStateRegistered(
vertices[0].getJobVertexId(),
- 0,
+ new KeyGroupRange(0, 0),
registrationName,
new KvStateID(),
new KvStateServerAddress(InetAddress.getLocalHost(), 12328));
@@ -130,7 +131,7 @@ public class KvStateLocationRegistryTest {
// Second operator registers same name
registry.notifyKvStateRegistered(
vertices[1].getJobVertexId(),
- 0,
+ new KeyGroupRange(0, 0),
registrationName,
new KvStateID(),
new KvStateServerAddress(InetAddress.getLocalHost(), 12032));
@@ -151,7 +152,7 @@ public class KvStateLocationRegistryTest {
KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap);
try {
- registry.notifyKvStateUnregistered(vertex.getJobVertexId(), 0, "any-name");
+ registry.notifyKvStateUnregistered(vertex.getJobVertexId(), new KeyGroupRange(0, 0), "any-name");
fail("Did not throw expected Exception, because of missing registration");
} catch (IllegalArgumentException ignored) {
// Expected
@@ -179,7 +180,7 @@ public class KvStateLocationRegistryTest {
// First operator registers name
registry.notifyKvStateRegistered(
vertices[0].getJobVertexId(),
- 0,
+ new KeyGroupRange(0, 0),
name,
new KvStateID(),
mock(KvStateServerAddress.class));
@@ -190,7 +191,7 @@ public class KvStateLocationRegistryTest {
registry.notifyKvStateUnregistered(
vertices[0].getJobVertexId(),
- notRegisteredKeyGroupIndex,
+ new KeyGroupRange(notRegisteredKeyGroupIndex, notRegisteredKeyGroupIndex),
name);
fail("Did not throw expected Exception");
@@ -201,7 +202,7 @@ public class KvStateLocationRegistryTest {
// Wrong operator tries to unregister
registry.notifyKvStateUnregistered(
vertices[1].getJobVertexId(),
- 0,
+ new KeyGroupRange(0, 0),
name);
fail("Did not throw expected Exception");
@@ -210,13 +211,13 @@ public class KvStateLocationRegistryTest {
}
// ------------------------------------------------------------------------
-
- private ExecutionJobVertex createJobVertex(int parallelism) {
+
+ private ExecutionJobVertex createJobVertex(int maxParallelism) {
JobVertexID id = new JobVertexID();
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
when(vertex.getJobVertexId()).thenReturn(id);
- when(vertex.getParallelism()).thenReturn(parallelism);
+ when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
return vertex;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
index 59ac575..ed51f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.junit.Test;
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -36,55 +39,87 @@ public class KvStateLocationTest {
JobID jobId = new JobID();
JobVertexID jobVertexId = new JobVertexID();
int numKeyGroups = 123;
+ int numRanges = 10;
+ int fract = numKeyGroups / numRanges;
+ int remain = numKeyGroups % numRanges;
+ List<KeyGroupRange> keyGroupRanges = new ArrayList<>(numRanges);
+
+ int start = 0;
+ for (int i = 0; i < numRanges; ++i) {
+ int end = start + fract - 1;
+ if(remain > 0) {
+ --remain;
+ ++end;
+ }
+ KeyGroupRange range = new KeyGroupRange(start, end);
+ keyGroupRanges.add(range);
+ start = end + 1;
+ }
+
+ System.out.println(keyGroupRanges);
+
String registrationName = "asdasdasdasd";
KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);
- KvStateID[] kvStateIds = new KvStateID[numKeyGroups];
- KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numKeyGroups];
+ KvStateID[] kvStateIds = new KvStateID[numRanges];
+ KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numRanges];
InetAddress host = InetAddress.getLocalHost();
// Register
- for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- kvStateIds[keyGroupIndex] = new KvStateID();
- serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
-
- location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
- assertEquals(keyGroupIndex + 1, location.getNumRegisteredKeyGroups());
+ int registeredCount = 0;
+ for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+ kvStateIds[rangeIdx] = new KvStateID();
+ serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
+ KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+ location.registerKvState(keyGroupRange, kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
+ registeredCount += keyGroupRange.getNumberOfKeyGroups();
+ assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
}
// Lookup
- for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
- assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+ for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+ KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+ for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+ assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup));
+ assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup));
+ }
}
// Overwrite
- for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- kvStateIds[keyGroupIndex] = new KvStateID();
- serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex);
+ for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+ kvStateIds[rangeIdx] = new KvStateID();
+ serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
- location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]);
- assertEquals(numKeyGroups, location.getNumRegisteredKeyGroups());
+ location.registerKvState(keyGroupRanges.get(rangeIdx), kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
+ assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
}
// Lookup
- for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex));
- assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex));
+ for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+ KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+ for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+ assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup));
+ assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup));
+ }
}
// Unregister
- for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- location.unregisterKvState(keyGroupIndex);
- assertEquals(numKeyGroups - keyGroupIndex - 1, location.getNumRegisteredKeyGroups());
+ for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+ KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+ location.unregisterKvState(keyGroupRange);
+ registeredCount -= keyGroupRange.getNumberOfKeyGroups();
+ assertEquals(registeredCount, location.getNumRegisteredKeyGroups());
}
// Lookup
- for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) {
- assertEquals(null, location.getKvStateID(keyGroupIndex));
- assertEquals(null, location.getKvStateServerAddress(keyGroupIndex));
+ for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
+ KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx);
+ for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
+ assertEquals(null, location.getKvStateID(keyGroup));
+ assertEquals(null, location.getKvStateServerAddress(keyGroup));
+ }
}
assertEquals(0, location.getNumRegisteredKeyGroups());
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 405f962..1039568 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -154,7 +154,7 @@ public class QueryableStateClientTest {
KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323);
KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3);
for (int i = 0; i < numKeyGroups; i++) {
- location.registerKvState(i, kvStateId, serverAddress);
+ location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
}
when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3)))
@@ -184,7 +184,7 @@ public class QueryableStateClientTest {
serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123);
location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4);
for (int i = 0; i < numKeyGroups; i++) {
- location.registerKvState(i, kvStateId, serverAddress);
+ location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress);
}
when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4)))
@@ -281,7 +281,7 @@ public class QueryableStateClientTest {
kvStateIds[i] = registries[i].registerKvState(
jobId,
new JobVertexID(),
- i, // key group index
+ new KeyGroupRange(i, i),
"choco",
kvState);
}
@@ -302,7 +302,7 @@ public class QueryableStateClientTest {
// Location lookup service
KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco");
for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) {
- location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
+ location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress());
}
KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class);
@@ -385,7 +385,7 @@ public class QueryableStateClientTest {
// Exact contents don't matter here
KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name);
- location.registerKvState(0, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
+ location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
JobID jobId1 = new JobID();
JobID jobId2 = new JobID();
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index f785174..c8fb4bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -595,7 +595,7 @@ public class KvStateClientTest {
KvState<?> kvState = (KvState<?>) state;
// Register KvState (one state instance for all server)
- ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), 0, "any", kvState);
+ ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
}
final KvStateClient finalClient = client;
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 52c807f..7e6d713 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -278,7 +278,7 @@ public class KvStateServerHandlerTest {
KvStateID kvStateId = registry.registerKvState(
new JobID(),
new JobVertexID(),
- 0,
+ new KeyGroupRange(0, 0),
"vanilla",
kvState);
@@ -681,18 +681,18 @@ public class KvStateServerHandlerTest {
*/
static class TestRegistryListener implements KvStateRegistryListener {
volatile JobVertexID jobVertexID;
- volatile int keyGroupIndex;
+ volatile KeyGroupRange keyGroupIndex;
volatile String registrationName;
volatile KvStateID kvStateId;
@Override
public void notifyKvStateRegistered(JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId) {
this.jobVertexID = jobVertexId;
- this.keyGroupIndex = keyGroupIndex;
+ this.keyGroupIndex = keyGroupRange;
this.registrationName = registrationName;
this.kvStateId = kvStateId;
}
@@ -700,7 +700,7 @@ public class KvStateServerHandlerTest {
@Override
public void notifyKvStateUnregistered(JobID jobId,
JobVertexID jobVertexId,
- int keyGroupIndex,
+ KeyGroupRange keyGroupRange,
String registrationName) {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 33ec182..73e2808 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1108,6 +1108,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
CheckpointStreamFactory streamFactory = createStreamFactory();
KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+ KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange();
KvStateRegistryListener listener = mock(KvStateRegistryListener.class);
registry.registerListener(listener);
@@ -1122,7 +1123,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// Verify registered
verify(listener, times(1)).notifyKvStateRegistered(
- eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+ eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory));
@@ -1130,7 +1131,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.close();
verify(listener, times(1)).notifyKvStateUnregistered(
- eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"));
+ eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"));
backend.close();
// Initialize again
backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
@@ -1140,12 +1141,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// Verify registered again
verify(listener, times(2)).notifyKvStateRegistered(
- eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class));
+ eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
backend.close();
}
-
+
@Test
public void testEmptyStateCheckpointing() {
http://git-wip-us.apache.org/repos/asf/flink/blob/181b5451/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index b99858a..1259460 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -93,7 +93,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-@Ignore
public class QueryableStateITCase extends TestLogger {
private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
[3/5] flink git commit: [FLINK-4663] [jdbc] Fix JDBCOutputFormat log
message
Posted by se...@apache.org.
[FLINK-4663] [jdbc] Fix JDBCOutputFormat log message
This closes #2534
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/335a2826
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/335a2826
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/335a2826
Branch: refs/heads/master
Commit: 335a2826481725aabc86de8068909060894a274b
Parents: 181b545
Author: swapnil-chougule <sw...@pubmatic.com>
Authored: Thu Sep 22 18:01:20 2016 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:26:02 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/335a2826/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 5464a94..da4b1ad 100644
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -108,7 +108,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
@Override
public void writeRecord(Row row) throws IOException {
- if (typesArray != null && typesArray.length > 0 && typesArray.length == row.productArity()) {
+ if (typesArray != null && typesArray.length > 0 && typesArray.length != row.productArity()) {
LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
}
try {
[5/5] flink git commit: [FLINK-4666] [core] Declare constants as
final in ParameterTool
Posted by se...@apache.org.
[FLINK-4666] [core] Declare constants as final in ParameterTool
This closes #2538
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/568845a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/568845a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/568845a3
Branch: refs/heads/master
Commit: 568845a3cfea1f6fe71a99f1f546cbd4d947fb33
Parents: 37c9512
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Thu Sep 22 22:54:29 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:34:42 2016 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/api/java/utils/ParameterTool.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/568845a3/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 1c03c08..8f504e4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -45,8 +45,8 @@ import java.util.Properties;
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
- protected static String NO_VALUE_KEY = "__NO_VALUE_KEY";
- protected static String DEFAULT_UNDEFINED = "<undefined>";
+ protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
+ protected static final String DEFAULT_UNDEFINED = "<undefined>";
// ------------------ Constructors ------------------------
[4/5] flink git commit: [FLINK-4665] [misc] Remove unnecessary
boxing/unboxing of primitive types in various places
Posted by se...@apache.org.
[FLINK-4665] [misc] Remove unnecessary boxing/unboxing of primitive types in various places
This closes #2537
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37c9512f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37c9512f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37c9512f
Branch: refs/heads/master
Commit: 37c9512fadb2b2ad6d3167b97d25fd00a2fa7620
Parents: 335a282
Author: Alexander Pivovarov <ap...@gmail.com>
Authored: Thu Sep 22 20:27:08 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 12:33:18 2016 +0200
----------------------------------------------------------------------
.../api/java/sca/NestedMethodAnalyzer.java | 2 +-
.../aggregation/FloatSummaryAggregator.java | 2 +-
.../flink/api/java/utils/ParameterTool.java | 16 +++++++---------
.../flink/api/java/io/CsvInputFormatTest.java | 20 ++++++++++----------
.../api/java/sca/UdfAnalyzerExamplesTest.java | 2 +-
.../flink/api/java/sca/UdfAnalyzerTest.java | 4 ++--
6 files changed, 22 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
index 302df4e..7304c50 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/NestedMethodAnalyzer.java
@@ -607,7 +607,7 @@ public class NestedMethodAnalyzer extends BasicInterpreter {
else {
final int constant = tagged(values.get(2)).getIntConstant();
- if (constant < 0 || Integer.valueOf(methodOwner.split("Tuple")[1]) <= constant ) {
+ if (constant < 0 || Integer.parseInt(methodOwner.split("Tuple")[1]) <= constant ) {
analyzer.handleInvalidTupleAccess();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
index 745ca41..bc78841 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/summarize/aggregation/FloatSummaryAggregator.java
@@ -89,7 +89,7 @@ public class FloatSummaryAggregator extends NumericSummaryAggregator<Float> {
@Override
public Float result() {
// overflow will go to infinity
- return new Double(sum.value()).floatValue();
+ return (float) sum.value();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index a9389a5..1c03c08 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -19,8 +19,8 @@ package org.apache.flink.api.java.utils;
import org.apache.commons.cli.Option;
import org.apache.commons.lang3.math.NumberUtils;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
@@ -275,7 +275,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
public int getInt(String key) {
addToDefaults(key, null);
String value = getRequired(key);
- return Integer.valueOf(value);
+ return Integer.parseInt(value);
}
/**
@@ -285,11 +285,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
public int getInt(String key, int defaultValue) {
addToDefaults(key, Integer.toString(defaultValue));
String value = get(key);
- if(value == null) {
+ if (value == null) {
return defaultValue;
- } else {
- return Integer.valueOf(value);
}
+ return Integer.parseInt(value);
}
// -------------- LONG
@@ -301,7 +300,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
public long getLong(String key) {
addToDefaults(key, null);
String value = getRequired(key);
- return Long.valueOf(value);
+ return Long.parseLong(value);
}
/**
@@ -311,11 +310,10 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
public long getLong(String key, long defaultValue) {
addToDefaults(key, Long.toString(defaultValue));
String value = get(key);
- if(value == null) {
+ if (value == null) {
return defaultValue;
- } else {
- return Long.valueOf(value);
}
+ return Long.parseLong(value);
}
// -------------- FLOAT
http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index fa091d9..54f226c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -116,19 +116,19 @@ public class CsvInputFormatTest {
if (recordCounter == 1) {
assertNotNull(result);
assertEquals("this is", result.f0);
- assertEquals(new Integer(1), result.f1);
+ assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
assertEquals((long) format.getCurrentState(), 15);
} else if (recordCounter == 2) {
assertNotNull(result);
assertEquals("a test", result.f0);
- assertEquals(new Integer(3), result.f1);
+ assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
assertEquals((long) format.getCurrentState(), 29);
} else if (recordCounter == 3) {
assertNotNull(result);
assertEquals("#next", result.f0);
- assertEquals(new Integer(5), result.f1);
+ assertEquals(Integer.valueOf(5), result.f1);
assertEquals(new Double(6.0), result.f2);
assertEquals((long) format.getCurrentState(), 42);
} else {
@@ -196,21 +196,21 @@ public class CsvInputFormatTest {
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
- assertEquals(new Integer(1), result.f1);
+ assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
assertEquals((long) format.getCurrentState(), 65);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
- assertEquals(new Integer(3), result.f1);
+ assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
assertEquals((long) format.getCurrentState(), 91);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("#next", result.f0);
- assertEquals(new Integer(5), result.f1);
+ assertEquals(Integer.valueOf(5), result.f1);
assertEquals(new Double(6.0), result.f2);
assertEquals((long) format.getCurrentState(), 104);
@@ -248,13 +248,13 @@ public class CsvInputFormatTest {
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
- assertEquals(new Integer(1), result.f1);
+ assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
- assertEquals(new Integer(3), result.f1);
+ assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
@@ -292,13 +292,13 @@ public class CsvInputFormatTest {
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
- assertEquals(new Integer(1), result.f1);
+ assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
- assertEquals(new Integer(3), result.f1);
+ assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
result = format.nextRecord(result);
http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index 11ab5f2..01dc070 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -546,7 +546,7 @@ public class UdfAnalyzerExamplesTest {
for (int i = 0; i < split.length; i++) {
if (i == 42 - 1) {
- p.setLabel(new Integer(split[i].trim().substring(0, 1)));
+ p.setLabel(Integer.valueOf(split[i].trim().substring(0, 1)));
} else {
if (a < 42 && !split[i].trim().isEmpty()) {
features[a++] = Double.parseDouble(split[i].trim());
http://git-wip-us.apache.org/repos/asf/flink/blob/37c9512f/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index 20ce261..ac35793 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,7 +36,6 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -700,7 +700,7 @@ public class UdfAnalyzerTest {
@SuppressWarnings("unchecked")
@Override
public Tuple1<Tuple1<String>> map(Tuple1<Tuple1<String>> value) throws Exception {
- ((Tuple1<String>) value.getField(Integer.valueOf("2."))).f0 = "Hello";
+ ((Tuple1<String>) value.getField(Integer.parseInt("2."))).f0 = "Hello";
return value;
}
}