You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:53 UTC

[17/17] flink git commit: [FLINK-9316][streaming] Expose operator's unique ID in DataStream programs

[FLINK-9316][streaming] Expose operator's unique ID in DataStream programs

This allows to uniquely and stably across multiple job submissions identify operators.
Previously two different operators that were executed by tasks that had the same name
were indistinguishable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fd694db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fd694db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fd694db

Branch: refs/heads/master
Commit: 3fd694db6f502e5df12476246dce05b1d1fc27bf
Parents: 67eac44
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue May 8 17:46:29 2018 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 16:42:30 2018 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  6 ++
 .../kinesis/testutils/TestRuntimeContext.java   |  6 ++
 flink-contrib/flink-storm/pom.xml               |  8 +++
 .../flink/storm/wrappers/BoltWrapperTest.java   | 18 ++---
 .../api/operators/StreamingRuntimeContext.java  | 15 ++++
 .../source/InputFormatSourceFunctionTest.java   |  6 ++
 .../api/operators/GetOperatorUniqueIDTest.java  | 75 ++++++++++++++++++++
 .../operators/StreamingRuntimeContextTest.java  |  4 ++
 8 files changed, 129 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4605015..c9b5241 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -873,6 +874,11 @@ public class FlinkKafkaConsumerBaseTest {
 			public ExecutionConfig getExecutionConfig() {
 				return new ExecutionConfig();
 			}
+
+			@Override
+			public OperatorID getOperatorID() {
+				return new OperatorID();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
index 740d2f2..9a3ad72 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.testutils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -83,5 +84,10 @@ public class TestRuntimeContext extends StreamingRuntimeContext {
 		public ExecutionConfig getExecutionConfig() {
 			return new ExecutionConfig();
 		}
+
+		@Override
+		public OperatorID getOperatorID() {
+			return new OperatorID(42, 44);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
index 496aecd..fb52a93 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -180,6 +180,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 430e4d8..d405a45 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -32,12 +32,12 @@ import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.storm.util.TestDummyBolt;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockStreamConfig;
 
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -159,7 +159,7 @@ public class BoltWrapperTest extends AbstractTest {
 		PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
 		final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null);
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
 		wrapper.open();
 
 		wrapper.processElement(record);
@@ -195,7 +195,7 @@ public class BoltWrapperTest extends AbstractTest {
 		}
 
 		final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw);
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output);
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), output);
 		wrapper.open();
 
 		final SplitStreamType splitRecord = new SplitStreamType<Integer>();
@@ -248,7 +248,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 			final IRichBolt bolt = mock(IRichBolt.class);
 			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+			wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));
 
 			wrapper.open();
 			verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
@@ -261,7 +261,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 			final IRichBolt bolt = mock(IRichBolt.class);
 			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
-			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+			wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));
 
 			wrapper.open();
 			verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class));
@@ -278,7 +278,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 			TestDummyBolt testBolt = new TestDummyBolt();
 			BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(testBolt);
-			wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class));
+			wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class));
 
 			wrapper.open();
 			for (Entry<String, String> entry : cfg.toMap().entrySet()) {
@@ -305,7 +305,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final IRichBolt bolt = mock(IRichBolt.class);
 		BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
 
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
 		wrapper.open();
 
 		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class));
@@ -322,7 +322,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 		final BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, Object>(bolt);
 
-		wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class));
+		wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class));
 
 		wrapper.close();
 		wrapper.dispose();
@@ -379,7 +379,7 @@ public class BoltWrapperTest extends AbstractTest {
 		final CloseableRegistry closeableRegistry = new CloseableRegistry();
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());
-		when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
+		when(mockTask.getConfiguration()).thenReturn(new MockStreamConfig());
 		when(mockTask.getEnvironment()).thenReturn(env);
 		when(mockTask.getExecutionConfig()).thenReturn(execConfig);
 		when(mockTask.getCancelables()).thenReturn(closeableRegistry);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 1f42ccf..89c038f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -61,6 +61,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	private final StreamConfig streamConfig;
 
+	private final String operatorUniqueID;
+
 	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
 									Environment env, Map<String, Accumulator<?, ?>> accumulators) {
 		super(env.getTaskInfo(),
@@ -73,6 +75,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		this.operator = operator;
 		this.taskEnvironment = env;
 		this.streamConfig = new StreamConfig(env.getTaskConfiguration());
+		this.operatorUniqueID = operator.getOperatorID().toString();
 	}
 
 	// ------------------------------------------------------------------------
@@ -90,6 +93,18 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return operator.getProcessingTimeService();
 	}
 
+	/**
+	 * Returned value is guaranteed to be unique between operators within the same job and to be
+	 * stable and the same across job submissions.
+	 *
+	 * <p>This operation is currently only supported in Streaming (DataStream) contexts.
+	 *
+	 * @return String representation of the operator's unique id.
+	 */
+	public String getOperatorUniqueID() {
+		return operatorUniqueID;
+	}
+
 	// ------------------------------------------------------------------------
 	//  broadcast variables
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index 84a45d8..cad3df8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -308,6 +309,11 @@ public class InputFormatSourceFunctionTest {
 			public ExecutionConfig getExecutionConfig() {
 				return new ExecutionConfig();
 			}
+
+			@Override
+			public OperatorID getOperatorID() {
+				return new OperatorID();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java
new file mode 100644
index 0000000..9693e42
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the uid translation to {@link org.apache.flink.runtime.jobgraph.OperatorID}.
+ */
+@SuppressWarnings("serial")
+public class GetOperatorUniqueIDTest extends TestLogger {
+
+	/**
+	 * If expected values ever change double check that the change is not braking the contract of
+	 * {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions.
+	 */
+	@Test
+	public void testGetOperatorUniqueID() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+
+		env.fromElements(1, 2, 3)
+			.map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42")
+			.map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44");
+
+		env.execute();
+	}
+
+	private static class VerifyOperatorIDMapFunction extends AbstractRichFunction implements MapFunction<Integer, Integer> {
+		private static final long serialVersionUID = 6584823409744624276L;
+
+		private final String expectedOperatorUniqueID;
+
+		public VerifyOperatorIDMapFunction(String expectedOperatorUniqueID) {
+			this.expectedOperatorUniqueID = checkNotNull(expectedOperatorUniqueID);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			assertEquals(expectedOperatorUniqueID, ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID());
+		}
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3fd694db/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 87667b2..e04cedd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -296,6 +297,7 @@ public class StreamingRuntimeContextTest {
 		}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(StateDescriptor.class));
 
 		when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
+		when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
 
 		return operatorMock;
 	}
@@ -333,6 +335,7 @@ public class StreamingRuntimeContextTest {
 		}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(ListStateDescriptor.class));
 
 		when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
+		when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
 		return operatorMock;
 	}
 
@@ -369,6 +372,7 @@ public class StreamingRuntimeContextTest {
 		}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(MapStateDescriptor.class));
 
 		when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
+		when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
 		return operatorMock;
 	}