You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/13 20:01:40 UTC
[1/2] flink git commit: [FLINK-2011] [runtime] Improve error message
when user-defined serialization logic is erroneous
Repository: flink
Updated Branches:
refs/heads/master 48e21a1ae -> 113b20b7f
[FLINK-2011] [runtime] Improve error message when user-defined serialization logic is erroneous
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/113b20b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/113b20b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/113b20b7
Branch: refs/heads/master
Commit: 113b20b7f8717b12c5f0dfa691da582d426fbae0
Parents: 9da2f1f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 13 15:27:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 13 18:26:26 2015 +0200
----------------------------------------------------------------------
.../api/reader/AbstractRecordReader.java | 29 ++++++++++++++------
...llingAdaptiveSpanningRecordDeserializer.java | 1 -
2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/113b20b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 4ee7fad..bf43c72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -79,16 +79,27 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
- else if (handleEvent(bufferOrEvent.getEvent())) {
- if (inputGate.isFinished()) {
- isFinished = true;
-
- return false;
+ else {
+ // sanity check for leftover data in deserializers. events should only come between
+ // records, not in the middle of a fragment
+ if (recordDeserializers[bufferOrEvent.getChannelIndex()].hasUnfinishedData()) {
+ throw new IllegalStateException(
+ "Received an event in channel " + bufferOrEvent.getChannelIndex() + " while still having "
+ + "data from a record. This indicates broken serialization logic. "
+ + "If you are using custom serialization code (Writable or Value types), check their "
+ + "serialization routines. In the case of Kryo, check the respective Kryo serializer.");
+ }
+
+ if (handleEvent(bufferOrEvent.getEvent())) {
+ if (inputGate.isFinished()) {
+ isFinished = true;
+ return false;
+ }
+ else if (hasReachedEndOfSuperstep()) {
+ return false;
+ }
+ // else: More data is coming...
}
- else if (hasReachedEndOfSuperstep()) {
-
- return false;
- } // else: More data is coming...
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/113b20b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index eae3e65..453d448 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.util.StringUtils;
[2/2] flink git commit: [FLINK-2006] [runtime] Fix testing
TaskManager task status request to stabilize tests.
Posted by se...@apache.org.
[FLINK-2006] [runtime] Fix testing TaskManager task status request to stabilize tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da2f1f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da2f1f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da2f1f7
Branch: refs/heads/master
Commit: 9da2f1f725473f1242067acb546607888e3a5015
Parents: 48e21a1
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 13 14:23:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 13 18:26:26 2015 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/testingUtils/TestingTaskManager.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9da2f1f7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 5318254..220e6ca 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -71,8 +71,8 @@ class TestingTaskManager(config: TaskManagerConfiguration,
def receiveTestMessages: Receive = {
case NotifyWhenTaskIsRunning(executionID) => {
Option(runningTasks.get(executionID)) match {
- case Some(_) => sender ! true
- case None =>
+ case Some(task) if task.getExecutionState == ExecutionState.RUNNING => sender ! true
+ case _ =>
val listeners = waitForRunning.getOrElse(executionID, Set())
waitForRunning += (executionID -> (listeners + sender))
}