You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:33 UTC
[5/8] flink git commit: [FLINK-2438] [runtime] Improve channel event
serialization performance.
[FLINK-2438] [runtime] Improve channel event serialization performance.
Because channel events may become very frequent now (frequent at-least-once checkpointing), their serialization perfomance starts to matter.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af88aa09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af88aa09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af88aa09
Branch: refs/heads/master
Commit: af88aa09ec94d4d11f38a7134d36793420d7d19d
Parents: aa0105a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 30 19:02:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:28 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/event/AbstractEvent.java | 28 ++++
.../flink/runtime/event/RuntimeEvent.java | 24 ++++
.../apache/flink/runtime/event/TaskEvent.java | 25 ++++
.../flink/runtime/event/task/AbstractEvent.java | 28 ----
.../flink/runtime/event/task/RuntimeEvent.java | 24 ----
.../flink/runtime/event/task/TaskEvent.java | 25 ----
.../runtime/io/network/TaskEventDispatcher.java | 2 +-
.../io/network/api/CheckpointBarrier.java | 98 ++++++++++++++
.../io/network/api/EndOfPartitionEvent.java | 2 +-
.../io/network/api/EndOfSuperstepEvent.java | 2 +-
.../io/network/api/TaskEventHandler.java | 2 +-
.../io/network/api/reader/AbstractReader.java | 4 +-
.../io/network/api/reader/ReaderBase.java | 2 +-
.../api/serialization/EventSerializer.java | 132 ++++++++++++++-----
.../io/network/api/writer/RecordWriter.java | 2 +-
.../api/writer/ResultPartitionWriter.java | 4 +-
.../network/buffer/FreeingBufferRecycler.java | 43 ++++++
.../runtime/io/network/netty/NettyMessage.java | 2 +-
.../network/netty/PartitionRequestClient.java | 2 +-
.../netty/PartitionRequestClientHandler.java | 5 +-
.../partition/consumer/BufferOrEvent.java | 2 +-
.../partition/consumer/InputChannel.java | 2 +-
.../network/partition/consumer/InputGate.java | 2 +-
.../partition/consumer/LocalInputChannel.java | 2 +-
.../partition/consumer/RemoteInputChannel.java | 2 +-
.../partition/consumer/SingleInputGate.java | 4 +-
.../partition/consumer/UnionInputGate.java | 2 +-
.../partition/consumer/UnknownInputChannel.java | 2 +-
.../iterative/concurrent/SuperstepBarrier.java | 2 +-
.../event/IterationEventWithAggregators.java | 2 +-
.../iterative/event/TerminationEvent.java | 2 +-
.../task/IterationSynchronizationSinkTask.java | 2 +-
.../iterative/task/SyncEventHandler.java | 2 +-
.../flink/runtime/event/task/EventList.java | 3 +-
.../runtime/event/task/IntegerTaskEvent.java | 1 +
.../runtime/event/task/StringTaskEvent.java | 1 +
.../flink/runtime/event/task/TaskEventTest.java | 3 +-
.../network/api/reader/AbstractReaderTest.java | 9 +-
.../io/network/api/reader/BufferReaderTest.java | 2 +-
.../api/serialization/EventSerializerTest.java | 39 ++++--
.../partition/PipelinedSubpartitionTest.java | 2 +-
.../partition/consumer/InputChannelTest.java | 2 +-
.../partition/consumer/SingleInputGateTest.java | 2 +-
.../io/network/util/TestConsumerCallback.java | 2 +-
.../network/util/TestSubpartitionConsumer.java | 2 +-
.../runtime/io/network/util/TestTaskEvent.java | 2 +-
.../concurrent/SuperstepBarrierTest.java | 2 +-
.../util/event/TaskEventHandlerTest.java | 2 +-
.../streaming/runtime/io/BarrierBuffer.java | 2 +-
.../streaming/runtime/io/BarrierTracker.java | 2 +-
.../streaming/runtime/io/BufferSpiller.java | 3 +-
.../runtime/io/CheckpointBarrierHandler.java | 14 +-
.../runtime/io/FreeingBufferRecycler.java | 37 ------
.../runtime/io/RecordWriterOutput.java | 4 +-
.../runtime/io/StreamInputProcessor.java | 4 +-
.../runtime/io/StreamTwoInputProcessor.java | 4 +-
.../runtime/tasks/CheckpointBarrier.java | 97 --------------
.../streaming/runtime/tasks/OutputHandler.java | 1 +
.../streaming/runtime/tasks/StreamTask.java | 1 +
.../consumer/StreamTestSingleInputGate.java | 3 +-
.../io/BarrierBufferMassiveRandomTest.java | 4 +-
.../streaming/runtime/io/BarrierBufferTest.java | 5 +-
.../runtime/io/BarrierTrackerTest.java | 5 +-
.../streaming/runtime/io/BufferSpillerTest.java | 1 +
.../runtime/io/StreamRecordWriterTest.java | 1 +
.../flink/streaming/runtime/io/TestEvent.java | 2 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 1 +
.../tasks/OneInputStreamTaskTestHarness.java | 2 +-
.../runtime/tasks/StreamMockEnvironment.java | 3 +-
.../runtime/tasks/StreamTaskTestHarness.java | 2 +-
.../runtime/tasks/TwoInputStreamTaskTest.java | 3 +-
.../tasks/TwoInputStreamTaskTestHarness.java | 2 +-
.../manual/StreamingScalabilityAndLatency.java | 3 +-
73 files changed, 435 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
new file mode 100644
index 0000000..a20aad9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/AbstractEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * This type of event can be used to exchange notification messages between
+ * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
+ * at runtime using the communication channels.
+ */
+public abstract class AbstractEvent implements IOReadableWritable {}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
new file mode 100644
index 0000000..6d712ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/RuntimeEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+/**
+ * Subclasses of this event are recognized as events exchanged by the core runtime.
+ */
+public abstract class RuntimeEvent extends AbstractEvent {}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
new file mode 100644
index 0000000..d37d031
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/TaskEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event;
+
+/**
+ * Subclasses of this event are recognized as custom events that are not part of the core
+ * flink runtime.
+ */
+public abstract class TaskEvent extends AbstractEvent {}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
deleted file mode 100644
index 244d672..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * This type of event can be used to exchange notification messages between
- * different {@link org.apache.flink.runtime.taskmanager.TaskManager} objects
- * at runtime using the communication channels.
- */
-public abstract class AbstractEvent implements IOReadableWritable {}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
deleted file mode 100644
index 8c44073..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * Subclasses of this event are recognized as events exchanged by the core runtime.
- */
-public abstract class RuntimeEvent extends AbstractEvent {}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
deleted file mode 100644
index 01ecce2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * Subclasses of this event are recognized as custom events that are not part of the core
- * flink runtime.
- */
-public abstract class TaskEvent extends AbstractEvent {}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 82793e2..eddba8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network;
import com.google.common.collect.Maps;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
new file mode 100644
index 0000000..59f56b0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+/**
+ * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
+ * barriers are emitted by the sources when instructed to do so by the JobManager. When
+ * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point
+ * between the pre-checkpoint and post-checkpoint data.
+ *
+ * <p>Once an operator has received a checkpoint barrier from all its input channels, it
+ * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
+ * behavior and broadcast the barrier to downstream operators.</p>
+ *
+ * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
+ * is complete (exactly once)</p>
+ *
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ */
+public class CheckpointBarrier extends RuntimeEvent {
+
+ private long id;
+ private long timestamp;
+
+ public CheckpointBarrier() {}
+
+ public CheckpointBarrier(long id, long timestamp) {
+ this.id = id;
+ this.timestamp = timestamp;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeLong(id);
+ out.writeLong(timestamp);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ id = in.readLong();
+ timestamp = in.readLong();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof CheckpointBarrier)) {
+ return false;
+ }
+ else {
+ CheckpointBarrier that = (CheckpointBarrier) other;
+ return that.id == this.id && that.timestamp == this.timestamp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index c2b6fa4..293287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.RuntimeEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
/**
* This event marks a subpartition as fully consumed.
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
index 162afb7..7f51187 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.RuntimeEvent;
+import org.apache.flink.runtime.event.RuntimeEvent;
/**
* Marks the end of a superstep of one particular iteration superstep.
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
index ccd0feb..d2dc46b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 90564a8..84189be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.io.network.api.reader;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 9f8ae20..a1d705f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.api.reader;
import java.io.IOException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 76c88c1..b23b83b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -19,63 +19,122 @@
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.runtime.util.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+/**
+ * Utility class to serialize and deserialize task events.
+ */
public class EventSerializer {
+
+ private static final int END_OF_PARTITION_EVENT = 0;
- public final static BufferRecycler RECYCLER = new BufferRecycler() {
- @Override
- public void recycle(MemorySegment memorySegment) {
- memorySegment.free();
- }
- };
-
- public static ByteBuffer toSerializedEvent(AbstractEvent event) {
- try {
- final DataOutputSerializer serializer = new DataOutputSerializer(128);
+ private static final int CHECKPOINT_BARRIER_EVENT = 1;
- serializer.writeUTF(event.getClass().getName());
- event.write(serializer);
+ private static final int END_OF_SUPERSTEP_EVENT = 2;
- return serializer.wrapAsByteBuffer();
+ private static final int OTHER_EVENT = 3;
+
+ // ------------------------------------------------------------------------
+
+ public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+ final Class<?> eventClass = event.getClass();
+ if (eventClass == EndOfPartitionEvent.class) {
+ return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
+ }
+ else if (eventClass == CheckpointBarrier.class) {
+ CheckpointBarrier barrier = (CheckpointBarrier) event;
+
+ ByteBuffer buf = ByteBuffer.allocate(20);
+ buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+ buf.putLong(4, barrier.getId());
+ buf.putLong(12, barrier.getTimestamp());
+ return buf;
}
- catch (IOException e) {
- throw new RuntimeException("Error while serializing event.", e);
+ else if (eventClass == EndOfSuperstepEvent.class) {
+ return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
+ }
+ else {
+ try {
+ final DataOutputSerializer serializer = new DataOutputSerializer(128);
+ serializer.writeInt(OTHER_EVENT);
+ serializer.writeUTF(event.getClass().getName());
+ event.write(serializer);
+
+ return serializer.wrapAsByteBuffer();
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while serializing event.", e);
+ }
}
}
public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+ if (buffer.remaining() < 4) {
+ throw new RuntimeException("Incomplete event");
+ }
+
+ final ByteOrder bufferOrder = buffer.order();
+ buffer.order(ByteOrder.BIG_ENDIAN);
+
try {
- final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-
- final String className = deserializer.readUTF();
-
- final Class<? extends AbstractEvent> clazz;
- try {
- clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+ int type = buffer.getInt();
+
+ if (type == END_OF_PARTITION_EVENT) {
+ return EndOfPartitionEvent.INSTANCE;
}
- catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load event class '" + className + "'.", e);
+ else if (type == CHECKPOINT_BARRIER_EVENT) {
+ long id = buffer.getLong();
+ long timestamp = buffer.getLong();
+ return new CheckpointBarrier(id, timestamp);
}
- catch (ClassCastException e) {
- throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+ else if (type == END_OF_SUPERSTEP_EVENT) {
+ return EndOfSuperstepEvent.INSTANCE;
+ }
+ else if (type == OTHER_EVENT) {
+ try {
+ final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
+
+ final String className = deserializer.readUTF();
+
+ final Class<? extends AbstractEvent> clazz;
+ try {
+ clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not load event class '" + className + "'.", e);
+ }
+ catch (ClassCastException e) {
+ throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+ + AbstractEvent.class.getName() + "'.", e);
+ }
+
+ final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+ event.read(deserializer);
+
+ return event;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Error while deserializing or instantiating event.", e);
+ }
+ }
+ else {
+ throw new RuntimeException("Corrupt byte stream for event");
}
-
- final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
- event.read(deserializer);
-
- return event;
}
- catch (IOException e) {
- throw new RuntimeException("Error while deserializing event.", e);
+ finally {
+ buffer.order(bufferOrder);
}
}
@@ -86,7 +145,8 @@ public class EventSerializer {
public static Buffer toBuffer(AbstractEvent event) {
final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
- final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()), RECYCLER, false);
+ final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()),
+ FreeingBufferRecycler.INSTANCE, false);
buffer.setSize(serializedEvent.remaining());
return buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 885c316..2ae61ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 1192dbb..79c21c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.io.network.api.writer;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
new file mode 100644
index 0000000..fdce883
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A simple buffer recycler that frees the memory segments.
+ */
+public class FreeingBufferRecycler implements BufferRecycler {
+
+ public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
+
+ // ------------------------------------------------------------------------
+
+ // Not instantiable
+ private FreeingBufferRecycler() {}
+
+ /**
+ * Frees the given memory segment.
+ * @param memorySegment The memory segment to be recycled.
+ */
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ memorySegment.free();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 1540369..3a24181 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 78f6398..f6120d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 51b436b..3b7d921 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.io.network.netty;
import com.google.common.collect.Maps;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.netty.exception.TransportException;
@@ -300,7 +301,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
byte[] byteArray = new byte[bufferOrEvent.getSize()];
bufferOrEvent.getNettyBuffer().readBytes(byteArray);
- Buffer buffer = new Buffer(new MemorySegment(byteArray), EventSerializer.RECYCLER, false);
+ Buffer buffer = new Buffer(new MemorySegment(byteArray), FreeingBufferRecycler.INSTANCE, false);
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
index d2f3035..55e5767 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferOrEvent.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import static com.google.common.base.Preconditions.checkArgument;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 3998279..d282db5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index c4f9dc4..1f42cfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index e72f612..ff12153 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index b70c3a8..be2509f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 80a79d2..896fa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 730ead2..5599687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.util.event.EventListener;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index e4b9e57..cdf28be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index c91be1a..cc5d3c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.concurrent;
import java.util.concurrent.CountDownLatch;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index bc815dc..e259523 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 9e74c34..28181e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
/**
* Signals that the iteration is completely executed, participating tasks must terminate now
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 5eccd7b..fed0a17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.types.IntValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 8cce6ef..d7549d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task;
import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.types.Value;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
index 1f97a15..c055a92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/EventList.java
@@ -19,10 +19,11 @@
package org.apache.flink.runtime.event.task;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.util.SerializableArrayList;
/**
- * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.task.AbstractEvent}
+ * Objects of this class can store and serialize/deserialize {@link org.apache.flink.runtime.event.AbstractEvent}
* objects.
*
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
index 648dacc..cc67482 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.TaskEvent;
/**
* This class provides a simple implementation of an event that holds an integer value.
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
index 87f2e91..9095cc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.util.StringUtils;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
index 1ed8e39..b508923 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
@@ -25,12 +25,13 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Iterator;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Test;
/**
* This class contains serialization tests concerning task events derived from
- * {@link org.apache.flink.runtime.event.task.AbstractEvent}.
+ * {@link org.apache.flink.runtime.event.AbstractEvent}.
*
*/
public class TaskEventTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 14bf022..6853722 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -18,11 +18,10 @@
package org.apache.flink.runtime.io.network.api.reader;
-import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -70,7 +69,7 @@ public class AbstractReaderTest {
public void testEndOfPartitionEvent() throws Exception {
final AbstractReader reader = new MockReader(createInputGate(1));
- assertTrue(reader.handleEvent(new EndOfPartitionEvent()));
+ assertTrue(reader.handleEvent(EndOfPartitionEvent.INSTANCE));
}
/**
@@ -95,7 +94,7 @@ public class AbstractReaderTest {
}
try {
- reader.handleEvent(new EndOfSuperstepEvent());
+ reader.handleEvent(EndOfSuperstepEvent.INSTANCE);
fail("Did not throw expected exception when handling end of superstep event with non-iterative reader.");
}
@@ -122,7 +121,7 @@ public class AbstractReaderTest {
// All good, expected exception.
}
- EndOfSuperstepEvent eos = new EndOfSuperstepEvent();
+ EndOfSuperstepEvent eos = EndOfSuperstepEvent.INSTANCE;
// One end of superstep event for each input channel. The superstep finishes with the last
// received event.
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
index e1f8fd8..8519ac6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.api.reader;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 5a20a4b..ddfd758 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,25 +18,44 @@
package org.apache.flink.runtime.io.network.api.serialization;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+
import org.junit.Test;
import java.nio.ByteBuffer;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
public class EventSerializerTest {
@Test
public void testSerializeDeserializeEvent() {
-
- TestTaskEvent expected = new TestTaskEvent(Math.random(), 12361231273L);
-
- ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(expected);
-
- AbstractEvent actual = EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
-
- assertEquals(expected, actual);
+ try {
+ AbstractEvent[] events = {
+ EndOfPartitionEvent.INSTANCE,
+ EndOfSuperstepEvent.INSTANCE,
+ new CheckpointBarrier(1678L, 4623784L),
+ new TestTaskEvent(Math.random(), 12361231273L)
+ };
+
+ for (AbstractEvent evt : events) {
+ ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+ assertTrue(serializedEvent.hasRemaining());
+
+ AbstractEvent deserialized =
+ EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+ assertNotNull(deserialized);
+ assertEquals(evt, deserialized);
+ }
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 74549df..8750a1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index e95c774..9717530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2454399..82cc730 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
index 52083c4..0d66f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.util;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import java.util.concurrent.atomic.AtomicInteger;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 2766e53..18e0d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.util;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
index 0b29032..091d5d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.util;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index a6c0d72..2f26670 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Random;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
index 5c6aeb7..cb76276 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.util.event;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.event.task.IntegerTaskEvent;
import org.apache.flink.runtime.event.task.StringTaskEvent;
import org.apache.flink.runtime.io.network.api.TaskEventHandler;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0441937..b7766ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index a0b924f..119fb23 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
import java.util.ArrayDeque;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5f9a162..2bad197 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -27,10 +27,11 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.util.StringUtils;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 02dd33d..791fd40 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
@@ -43,8 +43,18 @@ public interface CheckpointBarrierHandler {
*/
BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+ /**
+ * Registers the given event handler to be notified on successful checkpoints.
+ *
+ * @param checkpointHandler The handler to register.
+ */
void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
-
+
+ /**
+ * Cleans up all internally held resources.
+ *
+ * @throws IOException Thrown, if the cleanup of I/O resources failed.
+ */
void cleanup() throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
deleted file mode 100644
index 27e37a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/FreeingBufferRecycler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-/**
- * A simple buffer recycler that only frees the memory segments.
- */
-public class FreeingBufferRecycler implements BufferRecycler {
-
- public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
-
- // ------------------------------------------------------------------------
-
- @Override
- public void recycle(MemorySegment memorySegment) {
- memorySegment.free();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index de8c205..f0f18b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.operators.Output;
@@ -111,7 +111,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
recordWriter.clearBuffers();
}
- public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
+ public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
recordWriter.broadcastEvent(barrier);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index f7d7fb0..4ad5b45 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/flink/blob/af88aa09/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ae97974..e3d2911 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;