You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/08/10 15:50:13 UTC

flink git commit: [FLINK-2387] add streaming test case for live accumulators

Repository: flink
Updated Branches:
  refs/heads/master dba2946f4 -> fdebcb83d


[FLINK-2387] add streaming test case for live accumulators

This closes #926.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdebcb83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdebcb83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdebcb83

Branch: refs/heads/master
Commit: fdebcb83d0b19aad4f37da14116c08972af06ff1
Parents: dba2946
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Jul 21 16:54:26 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 10 15:48:18 2015 +0200

----------------------------------------------------------------------
 .../functions/source/FromElementsFunction.java  |   2 +-
 .../accumulators/AccumulatorLiveITCase.java     | 175 ++++++++++++++-----
 2 files changed, 133 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdebcb83/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 28544ee..af47f59 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -36,7 +36,7 @@ import java.util.Collection;
  * 
  * <p>Upon construction, this source function serializes the elements using Flink's type information.
  * That way, any object transport using Java serialization will not be affected by the serializability
- * if the elements.</p>
+ * of the elements.</p>
  * 
  * @param <T> The type of elements returned by this function.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/fdebcb83/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 51d3eb9..020919e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -24,6 +24,7 @@ import akka.actor.Status;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -52,6 +53,8 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Before;
@@ -60,7 +63,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -75,6 +77,17 @@ import static org.junit.Assert.*;
 /**
  * Tests the availability of accumulator results during runtime. The test case tests a user-defined
  * accumulator and Flink's internal accumulators for two consecutive tasks.
+ *
+ * CHAINED[Source -> Map] -> Sink
+ *
+ * Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
+ * the task to the task manager which notifies the job manager and sends the current accumulators.
+ * The task blocks until the test has been notified about the current accumulator values.
+ *
+ * A barrier between the operators ensures that that pipelining is disabled for the streaming test.
+ * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
+ * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
+ * check for an upper bound of the elements read.
  */
 public class AccumulatorLiveITCase {
 
@@ -83,15 +96,17 @@ public class AccumulatorLiveITCase {
 	private static ActorSystem system;
 	private static ActorGateway jobManagerGateway;
 	private static ActorRef taskManager;
+
 	private static JobID jobID;
+	private static JobGraph jobGraph;
 
 	// name of user accumulator
-	private static String NAME = "test";
+	private static String ACCUMULATOR_NAME = "test";
 
 	// number of heartbeat intervals to check
 	private static final int NUM_ITERATIONS = 5;
 
-	private static List<String> inputData = new ArrayList<String>(NUM_ITERATIONS);
+	private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
 
 	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
 
@@ -113,29 +128,60 @@ public class AccumulatorLiveITCase {
 		for (int i=0; i < NUM_ITERATIONS; i++) {
 			inputData.add(i, String.valueOf(i+1));
 		}
+
+		NotifyingMapper.finished = false;
 	}
 
 	@After
 	public void after() throws Exception {
 		JavaTestKit.shutdownActorSystem(system);
+		inputData.clear();
 	}
 
 	@Test
-	public void testProgram() throws Exception {
+	public void testBatch() throws Exception {
 
-		new JavaTestKit(system) {{
+		/** The program **/
+		ExecutionEnvironment env = new BatchPlanExtractor();
+		env.setParallelism(1);
+
+		DataSet<String> input = env.fromCollection(inputData);
+		input
+				.flatMap(new NotifyingMapper())
+				.output(new NotifyingOutputFormat());
+
+		env.execute();
+
+		// Extract job graph and set job id for the task to notify of accumulator changes.
+		jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
+		jobID = jobGraph.getJobID();
+
+		verifyResults();
+	}
+
+
+	@Test
+	public void testStreaming() throws Exception {
+
+		StreamExecutionEnvironment env = new StreamJobExtractor();
+		env.setParallelism(1);
+
+		DataStream<String> input = env.fromCollection(inputData);
+		input
+				.flatMap(new NotifyingMapper())
+				.write(new NotifyingOutputFormat(), 1000).disableChaining();
 
-			/** The program **/
-			ExecutionEnvironment env = new PlanExtractor();
-			DataSet<String> input = env.fromCollection(inputData);
-			input
-					.flatMap(new WaitingUDF())
-					.output(new WaitingOutputFormat());
-			env.execute();
+		env.execute();
 
-			// Extract job graph and set job id for the task to notify of accumulator changes.
-			JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan);
-			jobID = jobGraph.getJobID();
+		jobGraph = ((StreamJobExtractor) env).graph;
+		jobID = jobGraph.getJobID();
+
+		verifyResults();
+	}
+
+
+	private static void verifyResults() {
+		new JavaTestKit(system) {{
 
 			ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
 
@@ -149,12 +195,12 @@ public class AccumulatorLiveITCase {
 			expectMsgClass(TIMEOUT, Status.Success.class);
 
 
-			ExecutionAttemptID mapperTaskID = null;
-
 			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
 			Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = msg.flinkAccumulators();
 			Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
 
+			ExecutionAttemptID mapperTaskID = null;
+
 			// find out the first task's execution attempt id
 			for (Map.Entry<ExecutionAttemptID, ?> entry : flinkAccumulators.entrySet()) {
 				if (entry.getValue() != null) {
@@ -163,8 +209,19 @@ public class AccumulatorLiveITCase {
 				}
 			}
 
+			ExecutionAttemptID sinkTaskID = null;
+
+			// find the second's task id
+			for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
+				if (key != mapperTaskID) {
+					sinkTaskID = key;
+					break;
+				}
+			}
+
 			/* Check for accumulator values */
-			if(checkUserAccumulators(0, userAccumulators) && checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
+			if(checkUserAccumulators(0, userAccumulators) &&
+					checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
 				LOG.info("Passed initial check for map task.");
 			} else {
 				fail("Wrong accumulator results when map task begins execution.");
@@ -172,7 +229,6 @@ public class AccumulatorLiveITCase {
 
 
 			int expectedAccVal = 0;
-			ExecutionAttemptID sinkTaskID = null;
 
 			/* for mapper task */
 			for (int i = 1; i <= NUM_ITERATIONS; i++) {
@@ -186,8 +242,16 @@ public class AccumulatorLiveITCase {
 				LOG.info("{}", flinkAccumulators);
 				LOG.info("{}", userAccumulators);
 
-				if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
-					LOG.info("Passed round " + i);
+				if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
+						checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
+					LOG.info("Passed round #" + i);
+				} else if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
+						checkFlinkAccumulators(sinkTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
+					// we determined the wrong task id and need to switch the two here
+					ExecutionAttemptID temp = mapperTaskID;
+					mapperTaskID = sinkTaskID;
+					sinkTaskID = temp;
+					LOG.info("Passed round #" + i);
 				} else {
 					fail("Failed in round #" + i);
 				}
@@ -197,15 +261,8 @@ public class AccumulatorLiveITCase {
 			flinkAccumulators = msg.flinkAccumulators();
 			userAccumulators = msg.userAccumulators();
 
-			// find the second's task id
-			for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
-				if (key != mapperTaskID) {
-					sinkTaskID = key;
-					break;
-				}
-			}
-
-			if(checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
+			if(checkUserAccumulators(expectedAccVal, userAccumulators) &&
+					checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
 				LOG.info("Passed initial check for sink task.");
 			} else {
 				fail("Wrong accumulator results when sink task begins execution.");
@@ -223,8 +280,9 @@ public class AccumulatorLiveITCase {
 				LOG.info("{}", flinkAccumulators);
 				LOG.info("{}", userAccumulators);
 
-				if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(sinkTaskID, i, 0, i*4, 0, flinkAccumulators)) {
-					LOG.info("Passed round " + i);
+				if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
+						checkFlinkAccumulators(sinkTaskID, i, 0, i * 4, 0, flinkAccumulators)) {
+					LOG.info("Passed round #" + i);
 				} else {
 					fail("Failed in round #" + i);
 				}
@@ -235,9 +293,10 @@ public class AccumulatorLiveITCase {
 		}};
 	}
 
+
 	private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
 		LOG.info("checking user accumulators");
-		return accumulatorMap.containsKey(NAME) && expected == ((IntCounter)accumulatorMap.get(NAME)).getLocalValue();
+		return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
 	}
 
 	private static boolean checkFlinkAccumulators(ExecutionAttemptID taskKey, int expectedRecordsIn, int expectedRecordsOut, int expectedBytesIn, int expectedBytesOut,
@@ -253,12 +312,12 @@ public class AccumulatorLiveITCase {
 				 * The following two cases are for the DataSource and Map task
 				 */
 				case NUM_RECORDS_OUT:
-					if(((LongCounter) entry.getValue()).getLocalValue() != expectedRecordsOut) {
+					if(((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsOut) {
 						return false;
 					}
 					break;
 				case NUM_BYTES_OUT:
-					if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytesOut) {
+					if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesOut) {
 						return false;
 					}
 					break;
@@ -266,12 +325,12 @@ public class AccumulatorLiveITCase {
 				 * The following two cases are for the DataSink task
 				 */
 				case NUM_RECORDS_IN:
-					if (((LongCounter) entry.getValue()).getLocalValue() != expectedRecordsIn) {
+					if (((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsIn) {
 						return false;
 					}
 					break;
 				case NUM_BYTES_IN:
-					if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytesIn) {
+					if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesIn) {
 						return false;
 					}
 					break;
@@ -284,15 +343,17 @@ public class AccumulatorLiveITCase {
 
 
 	/**
-	 * UDF that waits for at least the heartbeat interval's duration.
+	 * UDF that notifies when it changes the accumulator values
 	 */
-	private static class WaitingUDF extends RichFlatMapFunction<String, Integer> {
+	private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
 
 		private IntCounter counter = new IntCounter();
 
+		private static boolean finished = false;
+
 		@Override
 		public void open(Configuration parameters) throws Exception {
-			getRuntimeContext().addAccumulator(NAME, counter);
+			getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
 			notifyTaskManagerOfAccumulatorUpdate();
 		}
 
@@ -305,9 +366,16 @@ public class AccumulatorLiveITCase {
 			notifyTaskManagerOfAccumulatorUpdate();
 		}
 
+		@Override
+		public void close() throws Exception {
+			finished = true;
+		}
 	}
 
-	private static class WaitingOutputFormat implements OutputFormat<Integer> {
+	/**
+	 * Outputs format which notifies of accumulator changes and waits for the previous mapper.
+	 */
+	private static class NotifyingOutputFormat implements OutputFormat<Integer> {
 
 		@Override
 		public void configure(Configuration parameters) {
@@ -315,6 +383,11 @@ public class AccumulatorLiveITCase {
 
 		@Override
 		public void open(int taskNumber, int numTasks) throws IOException {
+			while (!NotifyingMapper.finished) {
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {}
+			}
 			notifyTaskManagerOfAccumulatorUpdate();
 		}
 
@@ -334,7 +407,7 @@ public class AccumulatorLiveITCase {
 	 */
 	public static void notifyTaskManagerOfAccumulatorUpdate() {
 		new JavaTestKit(system) {{
-			Timeout timeout = new Timeout(Duration.create(5, "seconds"));
+			Timeout timeout = new Timeout(TIMEOUT);
 			Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
 			try {
 				Await.result(ask, timeout.duration());
@@ -354,7 +427,7 @@ public class AccumulatorLiveITCase {
 		return jgg.compileJobGraph(op);
 	}
 
-	private static class PlanExtractor extends LocalEnvironment {
+	private static class BatchPlanExtractor extends LocalEnvironment {
 
 		private Plan plan = null;
 
@@ -363,6 +436,22 @@ public class AccumulatorLiveITCase {
 			plan = createProgramPlan();
 			return new JobExecutionResult(new JobID(), -1, null);
 		}
+	}
+
+
+	private static class StreamJobExtractor extends StreamExecutionEnvironment {
+
+		private JobGraph graph = null;
 
+		@Override
+		public JobExecutionResult execute() throws Exception {
+			return execute("default");
+		}
+
+		@Override
+		public JobExecutionResult execute(String jobName) throws Exception {
+			graph = this.streamGraph.getJobGraph();
+			return new JobExecutionResult(new JobID(), -1, null);
+		}
 	}
 }