You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:31 UTC
[04/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 89672df..671544e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -18,15 +18,18 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.junit.After;
@@ -36,6 +39,7 @@ import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@@ -152,36 +156,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final StreamTask<?, ?> mockTask = createMockTask();
AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 500 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 100 == 0);
assertTrue(op.getNextEvaluationTime() % 1100 == 0);
op.dispose();
@@ -194,25 +196,27 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testTumblingWindow() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final int windowSize = 50;
final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(
validatingIdentityFunction, identitySelector, windowSize, windowSize);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
final int numElements = 1000;
for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(i));
+ }
Thread.sleep(1);
}
@@ -232,27 +236,32 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdown();
+ }
}
@Test
public void testSlidingWindow() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
// tumbling window that triggers every 20 milliseconds
AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
final int numElements = 1000;
for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(i));
+ }
Thread.sleep(1);
}
@@ -288,6 +297,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdown();
+ }
}
@Test
@@ -296,39 +308,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
final Object lock = new Object();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(1));
@@ -360,7 +348,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- } finally {
+ }
+ finally {
timerService.shutdown();
}
}
@@ -371,39 +360,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
final Object lock = new Object();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(1));
@@ -426,31 +391,34 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- } finally {
+ }
+ finally {
timerService.shutdown();
}
}
@Test
public void testEmitTrailingDataOnClose() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// the operator has a window time that is so long that it will not fire in this test
final long oneYear = 365L * 24 * 60 * 60 * 1000;
AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
oneYear, oneYear);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
+
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
for (Integer i : data) {
- op.processElement(new StreamRecord<Integer>(i));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(i));
+ }
}
op.close();
@@ -465,15 +433,18 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdown();
+ }
}
@Test
public void testPropagateExceptionsFromClose() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
@@ -483,11 +454,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
new AccumulatingProcessingTimeWindowOperator<>(
failingFunction, identitySelector, hundredYears, hundredYears);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
for (int i = 0; i < 150; i++) {
- op.processElement(new StreamRecord<Integer>(i));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(i));
+ }
}
try {
@@ -506,6 +479,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdown();
+ }
}
// ------------------------------------------------------------------------
@@ -551,4 +527,49 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
}
}
+
+ private static StreamTask<?, ?> createMockTask() {
+ StreamTask<?, ?> task = mock(StreamTask.class);
+ when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
+ when(task.getName()).thenReturn("Test task name");
+ when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+ Environment env = mock(Environment.class);
+ when(env.getIndexInSubtaskGroup()).thenReturn(0);
+ when(env.getNumberOfSubtasks()).thenReturn(1);
+ when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
+
+ when(task.getEnvironment()).thenReturn(env);
+
+ return task;
+ }
+
+ private static StreamTask<?, ?> createMockTaskWithTimer(
+ final ScheduledExecutorService timerService, final Object lock)
+ {
+ StreamTask<?, ?> mockTask = createMockTask();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+ final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+ timerService.schedule(
+ new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ synchronized (lock) {
+ target.trigger(timestamp);
+ }
+ return null;
+ }
+ },
+ timestamp - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+ return mockTask;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index fa90e4a..106e833 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -18,14 +18,17 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.junit.After;
import org.junit.Test;
@@ -34,6 +37,7 @@ import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
@@ -149,36 +153,34 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
+ final StreamTask<?, ?> mockTask = createMockTask();
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
AggregatingProcessingTimeWindowOperator<String, String> op;
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 1000 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 500 == 0);
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
+ op.open();
assertTrue(op.getNextSlideTime() % 100 == 0);
assertTrue(op.getNextEvaluationTime() % 1100 == 0);
op.dispose();
@@ -191,19 +193,20 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testTumblingWindowUniqueElements() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final int windowSize = 50;
final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
new AggregatingProcessingTimeWindowOperator<>(
sumFunction, identitySelector, windowSize, windowSize);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
+
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
final int numElements = 1000;
@@ -228,6 +231,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdownNow();
+ }
}
@Test
@@ -239,37 +245,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSize = 50;
final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
final Object lock = new Object();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
new AggregatingProcessingTimeWindowOperator<>(
sumFunction, identitySelector, windowSize, windowSize);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
+
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
final int numWindows = 10;
@@ -315,23 +299,26 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testSlidingWindow() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
final int numElements = 1000;
for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(i));
+ }
Thread.sleep(1);
}
@@ -366,6 +353,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdownNow();
+ }
}
@Test
@@ -374,38 +364,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
final Object lock = new Object();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(1));
@@ -428,30 +395,33 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- } finally {
+ }
+ finally {
timerService.shutdown();
}
}
@Test
public void testEmitTrailingDataOnClose() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
// the operator has a window time that is so long that it will not fire in this test
final long oneYear = 365L * 24 * 60 * 60 * 1000;
AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, oneYear, oneYear);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
+
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
for (Integer i : data) {
- op.processElement(new StreamRecord<Integer>(i));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(i));
+ }
}
op.close();
@@ -466,15 +436,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdown();
+ }
}
@Test
public void testPropagateExceptionsFromProcessElement() {
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
ReduceFunction<Integer> failingFunction = new FailingFunction(100);
@@ -484,11 +457,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
new AggregatingProcessingTimeWindowOperator<>(
failingFunction, identitySelector, hundredYears, hundredYears);
- op.setup(out, mockContext);
- op.open(new Configuration());
+ op.setup(mockTask, new StreamConfig(new Configuration()), out);
+ op.open();
for (int i = 0; i < 100; i++) {
- op.processElement(new StreamRecord<Integer>(1));
+ synchronized (lock) {
+ op.processElement(new StreamRecord<Integer>(1));
+ }
}
try {
@@ -505,6 +480,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ timerService.shutdown();
+ }
}
// ------------------------------------------------------------------------
@@ -546,4 +524,49 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
return value1 + value2;
}
}
+
+ private static StreamTask<?, ?> createMockTask() {
+ StreamTask<?, ?> task = mock(StreamTask.class);
+ when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
+ when(task.getName()).thenReturn("Test task name");
+ when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
+ Environment env = mock(Environment.class);
+ when(env.getIndexInSubtaskGroup()).thenReturn(0);
+ when(env.getNumberOfSubtasks()).thenReturn(1);
+ when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
+
+ when(task.getEnvironment()).thenReturn(env);
+
+ return task;
+ }
+
+ private static StreamTask<?, ?> createMockTaskWithTimer(
+ final ScheduledExecutorService timerService, final Object lock)
+ {
+ StreamTask<?, ?> mockTask = createMockTask();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+ final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+ timerService.schedule(
+ new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ synchronized (lock) {
+ target.trigger(timestamp);
+ }
+ return null;
+ }
+ },
+ timestamp - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+ return mockTask;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 06fca6b..6c48668 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -142,7 +142,7 @@ public class StreamTaskTestHarness<OUT> {
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
- streamConfig.setTypeSerializerOut1(outputSerializer);
+ streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
}
@@ -243,8 +243,8 @@ public class StreamTaskTestHarness<OUT> {
// first wait for all input queues to be empty
try {
Thread.sleep(1);
- } catch (InterruptedException e) {
- }
+ } catch (InterruptedException ignored) {}
+
while (true) {
boolean allEmpty = true;
for (int i = 0; i < numInputGates; i++) {
@@ -254,8 +254,8 @@ public class StreamTaskTestHarness<OUT> {
}
try {
Thread.sleep(10);
- } catch (InterruptedException e) {
- }
+ } catch (InterruptedException ignored) {}
+
if (allEmpty) {
break;
}
@@ -273,8 +273,7 @@ public class StreamTaskTestHarness<OUT> {
try {
Thread.sleep(1);
- } catch (InterruptedException e) {
- }
+ } catch (InterruptedException ignored) {}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
index 7a53ceb..cdc2c53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -59,7 +60,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
DataStream<String> source = env.addSource(new InfiniteTestSource());
- source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(StreamOperator.ChainingStrategy.ALWAYS));
+ source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
boolean testSuccess = false;
try {
@@ -95,7 +96,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
DataStream<String> source = env.addSource(new InfiniteTestSource());
- source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(StreamOperator.ChainingStrategy.NEVER));
+ source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
boolean testSuccess = false;
try {
@@ -134,7 +135,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
source.connect(source).transform(
"Custom Operator",
BasicTypeInfo.STRING_TYPE_INFO,
- new TwoInputTimerOperator(StreamOperator.ChainingStrategy.NEVER));
+ new TwoInputTimerOperator(ChainingStrategy.NEVER));
boolean testSuccess = false;
try {
@@ -180,7 +181,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
if (first) {
- getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+ registerTimer(System.currentTimeMillis() + 100, this);
first = false;
}
numElements++;
@@ -197,7 +198,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
try {
numTimers++;
throwIfDone();
- getRuntimeContext().registerTimer(System.currentTimeMillis() + 1, this);
+ registerTimer(System.currentTimeMillis() + 1, this);
} finally {
semaphore.release();
}
@@ -236,7 +237,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
if (first) {
- getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+ registerTimer(System.currentTimeMillis() + 100, this);
first = false;
}
numElements++;
@@ -251,7 +252,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
if (first) {
- getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+ registerTimer(System.currentTimeMillis() + 100, this);
first = false;
}
numElements++;
@@ -269,7 +270,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
try {
numTimers++;
throwIfDone();
- getRuntimeContext().registerTimer(System.currentTimeMillis() + 1, this);
+ registerTimer(System.currentTimeMillis() + 1, this);
} finally {
semaphore.release();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index a88aa1a..dafba9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -439,8 +439,8 @@ public class TimestampITCase {
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open() throws Exception {
+ super.open();
watermarks = new ArrayList<Watermark>();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 3651230..000a1a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -21,17 +21,33 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class MockContext<IN, OUT> {
+
private Collection<IN> inputs;
private List<OUT> outputs;
@@ -57,27 +73,63 @@ public class MockContext<IN, OUT> {
public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
- StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
- new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- new ExecutionConfig(),
- null, null,
- new HashMap<String, Accumulator<?, ?>>(),
- null);
-
- operator.setup(mockContext.output, runtimeContext);
+
+ final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+ final Object lock = new Object();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+ operator.setup(mockTask, new StreamConfig(new Configuration()), mockContext.output);
try {
- operator.open(null);
+ operator.open();
StreamRecord<IN> nextRecord;
for (IN in: inputs) {
- operator.processElement(new StreamRecord<IN>(in));
+ synchronized (lock) {
+ operator.processElement(new StreamRecord<IN>(in));
+ }
}
operator.close();
} catch (Exception e) {
throw new RuntimeException("Cannot invoke operator.", e);
+ } finally {
+ timerService.shutdownNow();
}
return mockContext.getOutputs();
}
+
+ private static StreamTask<?, ?> createMockTaskWithTimer(
+ final ScheduledExecutorService timerService, final Object lock)
+ {
+ StreamTask<?, ?> task = mock(StreamTask.class);
+ when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
+ when(task.getName()).thenReturn("Test task name");
+ when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+ when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
+ when(task.getCheckpointLock()).thenReturn(lock);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ final Long timestamp = (Long) invocationOnMock.getArguments()[0];
+ final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
+ timerService.schedule(
+ new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ synchronized (lock) {
+ target.trigger(timestamp);
+ }
+ return null;
+ }
+ },
+ timestamp - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ return null;
+ }
+ }).when(task).registerTimer(anyLong(), any(Triggerable.class));
+
+ return task;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index f5ce3fc..edf3a09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,25 +18,28 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.mockito.stubbing.OngoingStubbing;
-import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A test harness for testing a {@link OneInputStreamOperator}.
*
@@ -47,28 +50,39 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class OneInputStreamOperatorTestHarness<IN, OUT> {
- OneInputStreamOperator<IN, OUT> operator;
+ final OneInputStreamOperator<IN, OUT> operator;
- ConcurrentLinkedQueue<Object> outputList;
+ final ConcurrentLinkedQueue<Object> outputList;
- ExecutionConfig executionConfig;
+ final ExecutionConfig executionConfig;
+
+ final Object checkpointLock;
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
+ this(operator, new StreamConfig(new Configuration()));
+ }
+
+ public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, StreamConfig config) {
this.operator = operator;
+ this.outputList = new ConcurrentLinkedQueue<Object>();
+ this.executionConfig = new ExecutionConfig();
+ this.checkpointLock = new Object();
+
+ Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+ StreamTask<?, ?> mockTask = mock(StreamTask.class);
+ when(mockTask.getName()).thenReturn("Mock Task");
+ when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+ when(mockTask.getConfiguration()).thenReturn(config);
+ when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+
+ // ugly Java generic hacks
+ @SuppressWarnings("unchecked")
+ OngoingStubbing<StateBackend<?>> stubbing =
+ (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
+ stubbing.thenReturn(MemoryStateBackend.defaultInstance());
- outputList = new ConcurrentLinkedQueue<Object>();
-
- executionConfig = new ExecutionConfig();
-
- StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
- new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- executionConfig,
- null,
- new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
- new HashMap<String, Accumulator<?, ?>>(),
- new OneInputStreamTask());
-
- operator.setup(new MockOutput(), runtimeContext);
+ operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
}
/**
@@ -81,19 +95,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)}
- * with an empty {@link org.apache.flink.configuration.Configuration}.
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}
*/
public void open() throws Exception {
- operator.open(new Configuration());
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(org.apache.flink.configuration.Configuration)}
- * with the given {@link org.apache.flink.configuration.Configuration}.
- */
- public void open(Configuration config) throws Exception {
- operator.open(config);
+ operator.open();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 428131a..2afdc40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.util;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -29,26 +28,30 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import static org.mockito.Mockito.*;
public class SourceFunctionUtil<T> {
public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
List<T> outputs = new ArrayList<T>();
+
if (sourceFunction instanceof RichFunction) {
+
+ AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
+ when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
+
RuntimeContext runtimeContext = new StreamingRuntimeContext(
+ operator,
new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- new ExecutionConfig(),
- null,
- new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
- new HashMap<String, Accumulator<?, ?>>(),
- null);
+ new HashMap<String, Accumulator<?, ?>>());
((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 2418f19..9b33c6a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -19,24 +19,27 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.mockito.stubbing.OngoingStubbing;
-import java.io.Serializable;
-import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A test harness for testing a {@link TwoInputStreamOperator}.
*
@@ -49,26 +52,37 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
TwoInputStreamOperator<IN1, IN2, OUT> operator;
- ConcurrentLinkedQueue<Object> outputList;
+ final ConcurrentLinkedQueue<Object> outputList;
+
+ final ExecutionConfig executionConfig;
- ExecutionConfig executionConfig;
+ final Object checkpointLock;
public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+ this(operator, new StreamConfig(new Configuration()));
+ }
+
+ public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) {
this.operator = operator;
+ this.outputList = new ConcurrentLinkedQueue<Object>();
+ this.executionConfig = new ExecutionConfig();
+ this.checkpointLock = new Object();
+
+ Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
+ StreamTask<?, ?> mockTask = mock(StreamTask.class);
+ when(mockTask.getName()).thenReturn("Mock Task");
+ when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+ when(mockTask.getConfiguration()).thenReturn(config);
+ when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+
+ // ugly Java generic hacks
+ @SuppressWarnings("unchecked")
+ OngoingStubbing<StateBackend<?>> stubbing =
+ (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
+ stubbing.thenReturn(MemoryStateBackend.defaultInstance());
- outputList = new ConcurrentLinkedQueue<Object>();
-
- executionConfig = new ExecutionConfig();
-
- StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
- new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- new ExecutionConfig(),
- null,
- new LocalStateHandle.LocalStateHandleProvider<>(),
- new HashMap<String, Accumulator<?, ?>>(),
- new TwoInputStreamTask());
-
- operator.setup(new MockOutput(), runtimeContext);
+ operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
}
/**
@@ -82,19 +96,10 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
/**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)}
- * with an empty {@link Configuration}.
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
*/
public void open() throws Exception {
- operator.open(new Configuration());
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open(Configuration)}
- * with the given {@link Configuration}.
- */
- public void open(Configuration config) throws Exception {
- operator.open(config);
+ operator.open();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0565f52..6855e00 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -228,11 +228,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
+ val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
+
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
- override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
+ override def getProducedType: TypeInformation[K] = keyType
}
- javaStream.keyBy(keyExtractor)
+ new JavaKeyedStream(javaStream, keyExtractor, keyType)
}
/**
@@ -431,32 +433,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
}
-
- /**
- * Creates a new DataStream by applying the given stateful function to every element of this
- * DataStream. To use state partitioning, a key must be defined using .keyBy(..), in which
- * case an independent state will be kept per key.
- *
- * Note that the user state object needs to be serializable.
- */
- def mapWithState[R: TypeInformation: ClassTag, S](
- fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Map function must not be null.")
- }
-
- val cleanFun = clean(fun)
- val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {
- override def map(in: T): R = {
- applyWithState(in, cleanFun)
- }
-
- val partitioned = isStatePartitioned
- }
-
- map(mapper)
- }
-
+
/**
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
@@ -501,32 +478,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
- * Creates a new DataStream by applying the given stateful function to every element and
- * flattening the results. To use state partitioning, a key must be defined using .keyBy(..),
- * in which case an independent state will be kept per key.
- *
- * Note that the user state object needs to be serializable.
- */
- def flatMapWithState[R: TypeInformation: ClassTag, S](
- fun: (T, Option[S]) => (TraversableOnce[R], Option[S])):
- DataStream[R] = {
- if (fun == null) {
- throw new NullPointerException("Flatmap function must not be null.")
- }
-
- val cleanFun = clean(fun)
- val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{
- override def flatMap(in: T, out: Collector[R]): Unit = {
- applyWithState(in, cleanFun) foreach out.collect
- }
-
- val partitioned = isStatePartitioned
- }
-
- flatMap(flatMapper)
- }
-
- /**
* Creates a new DataStream that contains only the elements satisfying the given filter predicate.
*/
def filter(filter: FilterFunction[T]): DataStream[T] = {
@@ -549,35 +500,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
this.filter(filter)
}
-
- /**
- * Creates a new DataStream that contains only the elements satisfying the given stateful filter
- * predicate. To use state partitioning, a key must be defined using .keyBy(..), in which case
- * an independent state will be kept per key.
- *
- * Note that the user state object needs to be serializable.
- */
- def filterWithState[S](
- fun: (T, Option[S]) => (Boolean, Option[S])): DataStream[T] = {
- if (fun == null) {
- throw new NullPointerException("Filter function must not be null.")
- }
-
- val cleanFun = clean(fun)
- val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] {
- override def filter(in: T): Boolean = {
- applyWithState(in, cleanFun)
- }
-
- val partitioned = isStatePartitioned
- }
-
- filter(filterFun)
- }
-
- private[flink] def isStatePartitioned: Boolean = {
- javaStream.isInstanceOf[JavaKeyedStream[_, _]]
- }
/**
* Windows this DataStream into tumbling time windows.
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index a588931..84354a3 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -18,18 +18,19 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.streaming.api.datastream.{KeyedStream => KeyedJavaStream, DataStream => JavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
+import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.operators.StreamGroupedReduce
+import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.AbstractTime
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window, TimeWindow}
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
+import org.apache.flink.util.Collector
+
import scala.reflect.ClassTag
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.functions.FoldFunction
-import org.apache.flink.api.common.functions.ReduceFunction
class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
@@ -262,10 +263,99 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
javaStream.getExecutionConfig)
}
- val invokable = new StreamGroupedReduce[T](reducer,javaStream.getKeySelector(),getType())
+ val invokable = new StreamGroupedReduce[T](reducer,
+ getType().createSerializer(getExecutionConfig))
new DataStream[T](javaStream.transform("aggregation", javaStream.getType(),invokable))
.asInstanceOf[DataStream[T]]
}
+
+ // ------------------------------------------------------------------------
+ // functions with state
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataStream that contains only the elements satisfying the given stateful filter
+ * predicate. To use state partitioning, a key must be defined using .keyBy(..), in which case
+ * an independent state will be kept per key.
+ *
+ * Note that the user state object needs to be serializable.
+ */
+ def filterWithState[S : TypeInformation](
+ fun: (T, Option[S]) => (Boolean, Option[S])): DataStream[T] = {
+ if (fun == null) {
+ throw new NullPointerException("Filter function must not be null.")
+ }
+
+ val cleanFun = clean(fun)
+ val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+
+ val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] {
+
+ override val stateType: TypeInformation[S] = stateTypeInfo
+
+ override def filter(in: T): Boolean = {
+ applyWithState(in, cleanFun)
+ }
+ }
+
+ filter(filterFun)
+ }
+
+ /**
+ * Creates a new DataStream by applying the given stateful function to every element of this
+ * DataStream. To use state partitioning, a key must be defined using .keyBy(..), in which
+ * case an independent state will be kept per key.
+ *
+ * Note that the user state object needs to be serializable.
+ */
+ def mapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
+ fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
+ if (fun == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+
+ val cleanFun = clean(fun)
+ val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+
+ val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {
+
+ override val stateType: TypeInformation[S] = stateTypeInfo
+
+ override def map(in: T): R = {
+ applyWithState(in, cleanFun)
+ }
+ }
+
+ map(mapper)
+ }
+
+ /**
+ * Creates a new DataStream by applying the given stateful function to every element and
+ * flattening the results. To use state partitioning, a key must be defined using .keyBy(..),
+ * in which case an independent state will be kept per key.
+ *
+ * Note that the user state object needs to be serializable.
+ */
+ def flatMapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
+ fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R] = {
+ if (fun == null) {
+ throw new NullPointerException("Flatmap function must not be null.")
+ }
+
+ val cleanFun = clean(fun)
+ val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+
+ val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{
+
+ override val stateType: TypeInformation[S] = stateTypeInfo
+
+ override def flatMap(in: T, out: Collector[R]): Unit = {
+ applyWithState(in, cleanFun) foreach out.collect
+ }
+ }
+
+ flatMap(flatMapper)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f767aba..29bf938 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,13 +19,14 @@
package org.apache.flink.streaming.api.scala
import java.util.Objects
+import java.util.Objects._
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.runtime.state.StateHandleProvider
+import org.apache.flink.streaming.api.state.StateBackend
import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
@@ -184,17 +185,39 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.enableCheckpointing()
this
}
-
- /**
- * Sets the given StateHandleProvider to be used for storing operator state
- * checkpoints when checkpointing is enabled.
- */
- def setStateHandleProvider(provider: StateHandleProvider[_]): StreamExecutionEnvironment = {
- javaEnv.setStateHandleProvider(provider)
+
+ def getCheckpointingMode = javaEnv.getCheckpointingMode()
+
+ /**
+ * Sets the state backend that describes how to store and checkpoint operator state.
+ * It defines in what form the key/value state, accessible from operations on
+ * [[KeyedStream]] is maintained (heap, managed memory, externally), and where state
+ * snapshots/checkpoints are stored, both for the key/value state, and for checkpointed
+ * functions (implementing the interface
+ * [[org.apache.flink.streaming.api.checkpoint.Checkpointed]].
+ *
+ * <p>The [[org.apache.flink.streaming.api.state.memory.MemoryStateBackend]] for example
+ * maintains the state in heap memory, as objects. It is lightweight without extra
+ * dependencies, but can checkpoint only small states (some counters).
+ *
+ * <p>In contrast, the [[org.apache.flink.streaming.api.state.filesystem.FsStateBackend]]
+ * stores checkpoints of the state (also maintained as heap objects) in files. When using
+ * a replicated file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee
+ * that state is not lost upon failures of individual nodes and that the entire streaming
+ * program can be executed highly available and strongly consistent (assuming that Flink
+ * is run in high-availability mode).
+ */
+ def setStateBackend(backend: StateBackend[_]): StreamExecutionEnvironment = {
+ javaEnv.setStateBackend(backend)
this
}
/**
+ * Returns the state backend that defines how to store and checkpoint state.
+ */
+ def getStateBackend: StateBackend[_] = javaEnv.getStateBackend()
+
+ /**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of "-1" indicates that the system
* default value (as defined in the configuration) should be used.
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index 89c9d00..5a591a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.scala.function
import org.apache.flink.api.common.functions.RichFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.state.OperatorState
@@ -28,17 +29,20 @@ import org.apache.flink.api.common.state.OperatorState
* call the applyWithState method in his own RichFunction implementation.
*/
trait StatefulFunction[I, O, S] extends RichFunction {
-
- var state: OperatorState[Option[S]] = _
- val partitioned: Boolean
+
+ var state: OperatorState[S] = _
+ val stateType: TypeInformation[S]
def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
- val (o, s) = fun(in, state.value)
- state.update(s)
+ val (o, s: Option[S]) = fun(in, Option(state.value()))
+ s match {
+ case Some(v) => state.update(v)
+ case None => state.update(null.asInstanceOf[S])
+ }
o
}
override def open(c: Configuration) = {
- state = getRuntimeContext().getOperatorState("state", None, partitioned)
+ state = getRuntimeContext().getKeyValueState[S](stateType, null.asInstanceOf[S])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 91639ed..fe85fd1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.scala
import java.lang
-import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner, FoldFunction, Function}
+import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.co.CoMapFunction
@@ -28,12 +28,13 @@ import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, Stre
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{PurgingTrigger, CountTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.runtime.partitioner._
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
+
import org.junit.Assert.fail
import org.junit.Test
-import org.apache.flink.streaming.api.scala.function.StatefulFunction
class DataStreamTest extends StreamingMultipleProgramsTestBase {
@@ -239,7 +240,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
* Tests whether parallelism gets set.
*/
@Test
- def testParallelism {
+ def testParallelism() {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
@@ -259,7 +260,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
try {
src.setParallelism(3)
- fail
+ fail()
}
catch {
case success: IllegalArgumentException => {
@@ -290,14 +291,14 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
@Test
- def testTypeInfo {
+ def testTypeInfo() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val src1: DataStream[Long] = env.generateSequence(0, 0)
assert(TypeExtractor.getForClass(classOf[Long]) == src1.getType)
val map: DataStream[(Integer, String)] = src1.map(x => null)
- assert(classOf[scala.Tuple2[Integer, String]] == map.getType.getTypeClass)
+ assert(classOf[scala.Tuple2[Integer, String]] == map.getType().getTypeClass)
val window: DataStream[String] = map
.windowAll(GlobalWindows.create())
@@ -310,12 +311,12 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
.fold(0, (accumulator: Int, value: String) => 0)
- assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType)
+ assert(TypeExtractor.getForClass(classOf[Int]) == flatten.getType())
// TODO check for custom case class
}
- @Test def operatorTest {
+ @Test def operatorTest() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val src = env.generateSequence(0, 0)
@@ -327,20 +328,14 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val map = src.map(mapFunction)
assert(mapFunction == getFunctionForDataStream(map))
assert(getFunctionForDataStream(map.map(x => 0)).isInstanceOf[MapFunction[_, _]])
-
- val statefulMap1 = src.mapWithState((in, state: Option[Long]) => (in, None))
- assert(getFunctionForDataStream(statefulMap1).isInstanceOf[MapFunction[_,_]])
- assert(!getFunctionForDataStream(statefulMap1).
- asInstanceOf[StatefulFunction[_,_,_]].partitioned)
- val statefulMap2 = src.keyBy(x=>x).mapWithState(
- (in, state: Option[Long]) => (in, None))
- assert(getFunctionForDataStream(statefulMap2).
- asInstanceOf[StatefulFunction[_,_,_]].partitioned)
+ val statefulMap2 = src.keyBy(x => x).mapWithState(
+ (in, state: Option[Long]) => (in, None.asInstanceOf[Option[Long]]))
val flatMapFunction = new FlatMapFunction[Long, Int] {
override def flatMap(value: Long, out: Collector[Int]): Unit = {}
}
+
val flatMap = src.flatMap(flatMapFunction)
assert(flatMapFunction == getFunctionForDataStream(flatMap))
assert(
@@ -348,15 +343,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
.flatMap((x: Int, out: Collector[Int]) => {}))
.isInstanceOf[FlatMapFunction[_, _]])
- val statefulfMap1 = src.flatMapWithState((in, state: Option[Long]) => (List(in), None))
- assert(getFunctionForDataStream(statefulfMap1).isInstanceOf[FlatMapFunction[_, _]])
- assert(!getFunctionForDataStream(statefulfMap1).
- asInstanceOf[StatefulFunction[_, _, _]].partitioned)
-
- val statefulfMap2 = src.keyBy(x=>x).flatMapWithState(
- (in, state: Option[Long]) => (List(in), None))
- assert(getFunctionForDataStream(statefulfMap2).
- asInstanceOf[StatefulFunction[_, _, _]].partitioned)
+ val statefulfMap2 = src.keyBy(x => x).flatMapWithState(
+ (in, state: Option[Long]) => (List(in), None.asInstanceOf[Option[Long]]))
val filterFunction = new FilterFunction[Int] {
override def filter(value: Int): Boolean = false
@@ -369,15 +357,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
.filter((x: Int) => true))
.isInstanceOf[FilterFunction[_]])
- val statefulFilter1 = src.filterWithState((in, state: Option[Long]) => (true, None))
- assert(getFunctionForDataStream(statefulFilter1).isInstanceOf[FilterFunction[_]])
- assert(!getFunctionForDataStream(statefulFilter1).
- asInstanceOf[StatefulFunction[_, _, _]].partitioned)
-
- val statefulFilter2 = src.keyBy(x=>x).filterWithState(
+ val statefulFilter2 = src.keyBy( x => x).filterWithState[Long](
(in, state: Option[Long]) => (false, None))
- assert(getFunctionForDataStream(statefulFilter2).
- asInstanceOf[StatefulFunction[_, _, _]].partitioned)
try {
env.getStreamGraph.getStreamEdge(map.getId, unionFilter.getId)
@@ -412,7 +393,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(2 == moreOutputSelectors.size)
val select = split.select("a")
- val sink = select.print
+ val sink = select.print()
val splitEdge =
env.getStreamGraph.getStreamEdge(unionFilter.getId, sink.getTransformation.getId)
assert("a" == splitEdge.getSelectedNames.get(0))
@@ -457,44 +438,44 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
@Test
- def testChannelSelectors {
+ def testChannelSelectors() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val src = env.generateSequence(0, 0)
val broadcast = src.broadcast
- val broadcastSink = broadcast.print
+ val broadcastSink = broadcast.print()
val broadcastPartitioner = env.getStreamGraph
.getStreamEdge(src.getId, broadcastSink.getTransformation.getId).getPartitioner
assert(broadcastPartitioner.isInstanceOf[BroadcastPartitioner[_]])
val shuffle: DataStream[Long] = src.shuffle
- val shuffleSink = shuffle.print
+ val shuffleSink = shuffle.print()
val shufflePartitioner = env.getStreamGraph
.getStreamEdge(src.getId, shuffleSink.getTransformation.getId).getPartitioner
assert(shufflePartitioner.isInstanceOf[ShufflePartitioner[_]])
val forward: DataStream[Long] = src.forward
- val forwardSink = forward.print
+ val forwardSink = forward.print()
val forwardPartitioner = env.getStreamGraph
.getStreamEdge(src.getId, forwardSink.getTransformation.getId).getPartitioner
assert(forwardPartitioner.isInstanceOf[ForwardPartitioner[_]])
val rebalance: DataStream[Long] = src.rebalance
- val rebalanceSink = rebalance.print
+ val rebalanceSink = rebalance.print()
val rebalancePartitioner = env.getStreamGraph
.getStreamEdge(src.getId, rebalanceSink.getTransformation.getId).getPartitioner
assert(rebalancePartitioner.isInstanceOf[RebalancePartitioner[_]])
val global: DataStream[Long] = src.global
- val globalSink = global.print
+ val globalSink = global.print()
val globalPartitioner = env.getStreamGraph
.getStreamEdge(src.getId, globalSink.getTransformation.getId).getPartitioner
assert(globalPartitioner.isInstanceOf[GlobalPartitioner[_]])
}
@Test
- def testIterations {
+ def testIterations() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// we need to rebalance before iteration
val source = env.fromElements(1, 2, 3).map { t: Int => t }
@@ -512,10 +493,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
(head.filter(_ == "2"), head.filter(_ != "2"))
}, 1000).print()
- fail
+ fail()
} catch {
case uoe: UnsupportedOperationException =>
- case e: Exception => fail
+ case e: Exception => fail()
}
val sg = env.getStreamGraph
@@ -531,7 +512,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
dataStream.print()
val operator = getOperatorForDataStream(dataStream)
.asInstanceOf[AbstractUdfStreamOperator[_, _]]
- return operator.getUserFunction.asInstanceOf[Function]
+ operator.getUserFunction.asInstanceOf[Function]
}
private def getOperatorForDataStream(dataStream: DataStream[_]): StreamOperator[_] = {
@@ -542,15 +523,15 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
private def isPartitioned(edge: StreamEdge): Boolean = {
- return edge.getPartitioner.isInstanceOf[HashPartitioner[_]]
+ edge.getPartitioner.isInstanceOf[HashPartitioner[_]]
}
private def isCustomPartitioned(edge: StreamEdge): Boolean = {
- return edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
+ edge.getPartitioner.isInstanceOf[CustomPartitionerWrapper[_, _]]
}
private def createDownStreamId(dataStream: DataStream[_]): Integer = {
- return dataStream.print.getTransformation.getId
+ dataStream.print().getTransformation.getId
}
private def createDownStreamId(dataStream: ConnectedStreams[_, _]): Integer = {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
index 650fd7e..7904bcb 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StateTestPrograms.scala
@@ -17,8 +17,9 @@
*/
package org.apache.flink.streaming.api.scala
+import java.util
+
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import java.util.HashSet
/**
* Test programs for stateful functions.
@@ -30,11 +31,13 @@ object StateTestPrograms {
// test stateful map
env.generateSequence(0, 10).setParallelism(1)
+ .keyBy(x => x)
.mapWithState((in, count: Option[Long]) =>
count match {
- case Some(c) => ((in - c), Some(c + 1))
+ case Some(c) => (in - c, Some(c + 1))
case None => (in, Some(1L))
}).setParallelism(1)
+
.addSink(new RichSinkFunction[Long]() {
var allZero = true
override def invoke(in: Long) = {
@@ -46,13 +49,17 @@ object StateTestPrograms {
})
// test stateful flatmap
- env.fromElements("Fir st-", "Hello world").flatMapWithState((w, s: Option[String]) =>
- s match {
- case Some(s) => (w.split(" ").toList.map(s + _), Some(w))
- case None => (List(w), Some(w))
- }).setParallelism(1)
+ env.fromElements("Fir st-", "Hello world")
+ .keyBy(x => x)
+ .flatMapWithState((w, s: Option[String]) =>
+ s match {
+ case Some(state) => (w.split(" ").toList.map(state + _), Some(w))
+ case None => (List(w), Some(w))
+ })
+ .setParallelism(1)
+
.addSink(new RichSinkFunction[String]() {
- val received = new HashSet[String]()
+ val received = new util.HashSet[String]()
override def invoke(in: String) = { received.add(in) }
override def close() = {
assert(received.size() == 3)