You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:32 UTC
[4/8] flink git commit: [FLINK-2438] [runtime] Improve channel event
serialization performance.
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
deleted file mode 100644
index d94b5b4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
-
-/**
- * Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The
- * barriers are emitted by the sources when instructed to do so by the JobManager. When
- * operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing
- * of further elements on this input until all inputs received the checkpoint barrier
- * corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for
- * a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to
- * downstream operators.
- *
- * <p>
- * The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier}
- * for a checkpoint with a higher id it is to discard all barriers that it received from previous
- * checkpoints and unblock all other inputs.
- */
-public class CheckpointBarrier extends TaskEvent {
-
- protected long id;
- protected long timestamp;
-
- public CheckpointBarrier() {}
-
- public CheckpointBarrier(long id, long timestamp) {
- this.id = id;
- this.timestamp = timestamp;
- }
-
- public long getId() {
- return id;
- }
-
- public long getTimestamp() {
- return id;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeLong(id);
- out.writeLong(timestamp);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- id = in.readLong();
- timestamp = in.readLong();
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof CheckpointBarrier)) {
- return false;
- }
- else {
- CheckpointBarrier that = (CheckpointBarrier) other;
- return that.id == this.id && that.timestamp == this.timestamp;
- }
- }
-
- @Override
- public String toString() {
- return String.format("CheckpointBarrier %d @ %d", id, timestamp);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 84614bf..c8fa9e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2098da8..aabc95d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 4007da8..173e894 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 7350516..678b145 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Random;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index b8b3a8c..872e226 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,15 +19,16 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.junit.AfterClass;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 532078c..fb61633 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,12 +19,13 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index ae384e1..05ce4be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index fb3beea..f07e3a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.types.LongValue;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
index 4a77757..286477a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.util.StringUtils;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 4399a10..296324a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index a8029e6..4f07fdb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index df0c9ee..bbc64f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -18,14 +18,13 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 435831f..0f372cb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 3c7204d..f87d7ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
@@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,7 +36,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index f37eb66..2b20101 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -45,7 +45,7 @@ import java.util.List;
* a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
* thread to finish. Use {@link #processElement}
* to send elements to the task. Use
- * {@link #processEvent(org.apache.flink.runtime.event.task.AbstractEvent)} to send events to the task.
+ * {@link #processEvent(org.apache.flink.runtime.event.AbstractEvent)} to send events to the task.
* Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
* that data entry is finished.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index a34ec15..8d41292 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -74,7 +75,7 @@ public class StreamingScalabilityAndLatency {
env.getConfig().enableObjectReuse();
env.setBufferTimeout(5L);
-// env.enableCheckpointing(1000);
+ env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env
.addSource(new TimeStampingSource())