You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/08/10 15:50:13 UTC
flink git commit: [FLINK-2387] add streaming test case for live
accumulators
Repository: flink
Updated Branches:
refs/heads/master dba2946f4 -> fdebcb83d
[FLINK-2387] add streaming test case for live accumulators
This closes #926.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdebcb83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdebcb83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdebcb83
Branch: refs/heads/master
Commit: fdebcb83d0b19aad4f37da14116c08972af06ff1
Parents: dba2946
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Jul 21 16:54:26 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 10 15:48:18 2015 +0200
----------------------------------------------------------------------
.../functions/source/FromElementsFunction.java | 2 +-
.../accumulators/AccumulatorLiveITCase.java | 175 ++++++++++++++-----
2 files changed, 133 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdebcb83/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 28544ee..af47f59 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -36,7 +36,7 @@ import java.util.Collection;
*
* <p>Upon construction, this source function serializes the elements using Flink's type information.
* That way, any object transport using Java serialization will not be affected by the serializability
- * if the elements.</p>
+ * of the elements.</p>
*
* @param <T> The type of elements returned by this function.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/fdebcb83/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 51d3eb9..020919e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -24,6 +24,7 @@ import akka.actor.Status;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
+import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
@@ -52,6 +53,8 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.After;
import org.junit.Before;
@@ -60,7 +63,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.io.IOException;
@@ -75,6 +77,17 @@ import static org.junit.Assert.*;
/**
* Tests the availability of accumulator results during runtime. The test case tests a user-defined
* accumulator and Flink's internal accumulators for two consecutive tasks.
+ *
+ * CHAINED[Source -> Map] -> Sink
+ *
+ * Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
+ * the task to the task manager which notifies the job manager and sends the current accumulators.
+ * The task blocks until the test has been notified about the current accumulator values.
+ *
+ * A barrier between the operators ensures that that pipelining is disabled for the streaming test.
+ * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
+ * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
+ * check for an upper bound of the elements read.
*/
public class AccumulatorLiveITCase {
@@ -83,15 +96,17 @@ public class AccumulatorLiveITCase {
private static ActorSystem system;
private static ActorGateway jobManagerGateway;
private static ActorRef taskManager;
+
private static JobID jobID;
+ private static JobGraph jobGraph;
// name of user accumulator
- private static String NAME = "test";
+ private static String ACCUMULATOR_NAME = "test";
// number of heartbeat intervals to check
private static final int NUM_ITERATIONS = 5;
- private static List<String> inputData = new ArrayList<String>(NUM_ITERATIONS);
+ private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -113,29 +128,60 @@ public class AccumulatorLiveITCase {
for (int i=0; i < NUM_ITERATIONS; i++) {
inputData.add(i, String.valueOf(i+1));
}
+
+ NotifyingMapper.finished = false;
}
@After
public void after() throws Exception {
JavaTestKit.shutdownActorSystem(system);
+ inputData.clear();
}
@Test
- public void testProgram() throws Exception {
+ public void testBatch() throws Exception {
- new JavaTestKit(system) {{
+ /** The program **/
+ ExecutionEnvironment env = new BatchPlanExtractor();
+ env.setParallelism(1);
+
+ DataSet<String> input = env.fromCollection(inputData);
+ input
+ .flatMap(new NotifyingMapper())
+ .output(new NotifyingOutputFormat());
+
+ env.execute();
+
+ // Extract job graph and set job id for the task to notify of accumulator changes.
+ jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
+ jobID = jobGraph.getJobID();
+
+ verifyResults();
+ }
+
+
+ @Test
+ public void testStreaming() throws Exception {
+
+ StreamExecutionEnvironment env = new StreamJobExtractor();
+ env.setParallelism(1);
+
+ DataStream<String> input = env.fromCollection(inputData);
+ input
+ .flatMap(new NotifyingMapper())
+ .write(new NotifyingOutputFormat(), 1000).disableChaining();
- /** The program **/
- ExecutionEnvironment env = new PlanExtractor();
- DataSet<String> input = env.fromCollection(inputData);
- input
- .flatMap(new WaitingUDF())
- .output(new WaitingOutputFormat());
- env.execute();
+ env.execute();
- // Extract job graph and set job id for the task to notify of accumulator changes.
- JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan);
- jobID = jobGraph.getJobID();
+ jobGraph = ((StreamJobExtractor) env).graph;
+ jobID = jobGraph.getJobID();
+
+ verifyResults();
+ }
+
+
+ private static void verifyResults() {
+ new JavaTestKit(system) {{
ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
@@ -149,12 +195,12 @@ public class AccumulatorLiveITCase {
expectMsgClass(TIMEOUT, Status.Success.class);
- ExecutionAttemptID mapperTaskID = null;
-
TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = msg.flinkAccumulators();
Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
+ ExecutionAttemptID mapperTaskID = null;
+
// find out the first task's execution attempt id
for (Map.Entry<ExecutionAttemptID, ?> entry : flinkAccumulators.entrySet()) {
if (entry.getValue() != null) {
@@ -163,8 +209,19 @@ public class AccumulatorLiveITCase {
}
}
+ ExecutionAttemptID sinkTaskID = null;
+
+ // find the second's task id
+ for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
+ if (key != mapperTaskID) {
+ sinkTaskID = key;
+ break;
+ }
+ }
+
/* Check for accumulator values */
- if(checkUserAccumulators(0, userAccumulators) && checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
+ if(checkUserAccumulators(0, userAccumulators) &&
+ checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) {
LOG.info("Passed initial check for map task.");
} else {
fail("Wrong accumulator results when map task begins execution.");
@@ -172,7 +229,6 @@ public class AccumulatorLiveITCase {
int expectedAccVal = 0;
- ExecutionAttemptID sinkTaskID = null;
/* for mapper task */
for (int i = 1; i <= NUM_ITERATIONS; i++) {
@@ -186,8 +242,16 @@ public class AccumulatorLiveITCase {
LOG.info("{}", flinkAccumulators);
LOG.info("{}", userAccumulators);
- if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
- LOG.info("Passed round " + i);
+ if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
+ checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
+ LOG.info("Passed round #" + i);
+ } else if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
+ checkFlinkAccumulators(sinkTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
+ // we determined the wrong task id and need to switch the two here
+ ExecutionAttemptID temp = mapperTaskID;
+ mapperTaskID = sinkTaskID;
+ sinkTaskID = temp;
+ LOG.info("Passed round #" + i);
} else {
fail("Failed in round #" + i);
}
@@ -197,15 +261,8 @@ public class AccumulatorLiveITCase {
flinkAccumulators = msg.flinkAccumulators();
userAccumulators = msg.userAccumulators();
- // find the second's task id
- for (ExecutionAttemptID key : flinkAccumulators.keySet()) {
- if (key != mapperTaskID) {
- sinkTaskID = key;
- break;
- }
- }
-
- if(checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
+ if(checkUserAccumulators(expectedAccVal, userAccumulators) &&
+ checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) {
LOG.info("Passed initial check for sink task.");
} else {
fail("Wrong accumulator results when sink task begins execution.");
@@ -223,8 +280,9 @@ public class AccumulatorLiveITCase {
LOG.info("{}", flinkAccumulators);
LOG.info("{}", userAccumulators);
- if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(sinkTaskID, i, 0, i*4, 0, flinkAccumulators)) {
- LOG.info("Passed round " + i);
+ if (checkUserAccumulators(expectedAccVal, userAccumulators) &&
+ checkFlinkAccumulators(sinkTaskID, i, 0, i * 4, 0, flinkAccumulators)) {
+ LOG.info("Passed round #" + i);
} else {
fail("Failed in round #" + i);
}
@@ -235,9 +293,10 @@ public class AccumulatorLiveITCase {
}};
}
+
private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) {
LOG.info("checking user accumulators");
- return accumulatorMap.containsKey(NAME) && expected == ((IntCounter)accumulatorMap.get(NAME)).getLocalValue();
+ return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
}
private static boolean checkFlinkAccumulators(ExecutionAttemptID taskKey, int expectedRecordsIn, int expectedRecordsOut, int expectedBytesIn, int expectedBytesOut,
@@ -253,12 +312,12 @@ public class AccumulatorLiveITCase {
* The following two cases are for the DataSource and Map task
*/
case NUM_RECORDS_OUT:
- if(((LongCounter) entry.getValue()).getLocalValue() != expectedRecordsOut) {
+ if(((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsOut) {
return false;
}
break;
case NUM_BYTES_OUT:
- if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytesOut) {
+ if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesOut) {
return false;
}
break;
@@ -266,12 +325,12 @@ public class AccumulatorLiveITCase {
* The following two cases are for the DataSink task
*/
case NUM_RECORDS_IN:
- if (((LongCounter) entry.getValue()).getLocalValue() != expectedRecordsIn) {
+ if (((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsIn) {
return false;
}
break;
case NUM_BYTES_IN:
- if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytesIn) {
+ if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesIn) {
return false;
}
break;
@@ -284,15 +343,17 @@ public class AccumulatorLiveITCase {
/**
- * UDF that waits for at least the heartbeat interval's duration.
+ * UDF that notifies when it changes the accumulator values
*/
- private static class WaitingUDF extends RichFlatMapFunction<String, Integer> {
+ private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
private IntCounter counter = new IntCounter();
+ private static boolean finished = false;
+
@Override
public void open(Configuration parameters) throws Exception {
- getRuntimeContext().addAccumulator(NAME, counter);
+ getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
notifyTaskManagerOfAccumulatorUpdate();
}
@@ -305,9 +366,16 @@ public class AccumulatorLiveITCase {
notifyTaskManagerOfAccumulatorUpdate();
}
+ @Override
+ public void close() throws Exception {
+ finished = true;
+ }
}
- private static class WaitingOutputFormat implements OutputFormat<Integer> {
+ /**
+ * Outputs format which notifies of accumulator changes and waits for the previous mapper.
+ */
+ private static class NotifyingOutputFormat implements OutputFormat<Integer> {
@Override
public void configure(Configuration parameters) {
@@ -315,6 +383,11 @@ public class AccumulatorLiveITCase {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
+ while (!NotifyingMapper.finished) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
notifyTaskManagerOfAccumulatorUpdate();
}
@@ -334,7 +407,7 @@ public class AccumulatorLiveITCase {
*/
public static void notifyTaskManagerOfAccumulatorUpdate() {
new JavaTestKit(system) {{
- Timeout timeout = new Timeout(Duration.create(5, "seconds"));
+ Timeout timeout = new Timeout(TIMEOUT);
Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
try {
Await.result(ask, timeout.duration());
@@ -354,7 +427,7 @@ public class AccumulatorLiveITCase {
return jgg.compileJobGraph(op);
}
- private static class PlanExtractor extends LocalEnvironment {
+ private static class BatchPlanExtractor extends LocalEnvironment {
private Plan plan = null;
@@ -363,6 +436,22 @@ public class AccumulatorLiveITCase {
plan = createProgramPlan();
return new JobExecutionResult(new JobID(), -1, null);
}
+ }
+
+
+ private static class StreamJobExtractor extends StreamExecutionEnvironment {
+
+ private JobGraph graph = null;
+ @Override
+ public JobExecutionResult execute() throws Exception {
+ return execute("default");
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ graph = this.streamGraph.getJobGraph();
+ return new JobExecutionResult(new JobID(), -1, null);
+ }
}
}