You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:04 UTC

[04/14] flink git commit: [FLINK-1638] [streaming] Seperated AbstractRecordReader for streaming and batch

[FLINK-1638] [streaming] Seperated AbstractRecordReader for streaming and batch


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9a39926
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9a39926
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9a39926

Branch: refs/heads/master
Commit: c9a399268309768738e65af3c52525560b85cd0c
Parents: cf49ebb
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Mar 5 19:55:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../api/reader/AbstractRecordReader.java        |  55 +++------
 .../reader/StreamingAbstractRecordReader.java   | 122 +++++++++++++++++++
 .../streaming/io/IndexedMutableReader.java      |   4 +-
 .../io/StreamingMutableRecordReader.java        |  44 +++++++
 4 files changed, 186 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 920792c..e70b6ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -18,34 +18,24 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import java.io.IOException;
-
-
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 
 /**
  * A record-oriented reader.
  * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
+ * This abstract base class is used by both the mutable and immutable record readers.
+ *
+ * @param <T> The type of the record that can be read with this record reader.
  */
-abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
-		ReaderBase {
-
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class);
+abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase {
 
 	private final RecordDeserializer<T>[] recordDeserializers;
 
@@ -53,15 +43,11 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 
 	private boolean isFinished;
 
-	private final BarrierBuffer barrierBuffer;
-
 	protected AbstractRecordReader(InputGate inputGate) {
 		super(inputGate);
-		barrierBuffer = new BarrierBuffer(inputGate, this);
 
 		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
-				.getNumberOfInputChannels()];
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		for (int i = 0; i < recordDeserializers.length; i++) {
 			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
 		}
@@ -86,27 +72,22 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 				}
 			}
 
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+			final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
 
 			if (bufferOrEvent.isBuffer()) {
 				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
 				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
-
-				if (event instanceof StreamingSuperstep) {
-					barrierBuffer.processSuperstep(bufferOrEvent);
-				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
+			}
+			else if (handleEvent(bufferOrEvent.getEvent())) {
+				if (inputGate.isFinished()) {
+					isFinished = true;
+
+					return false;
 				}
+				else if (hasReachedEndOfSuperstep()) {
+
+					return false;
+				} // else: More data is coming...
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..ea2d7a6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ * 
+ * @param <T>
+ *            The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
+		ReaderBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+	private final RecordDeserializer<T>[] recordDeserializers;
+
+	private RecordDeserializer<T> currentRecordDeserializer;
+
+	private boolean isFinished;
+
+	private final BarrierBuffer barrierBuffer;
+
+	protected StreamingAbstractRecordReader(InputGate inputGate) {
+		super(inputGate);
+		barrierBuffer = new BarrierBuffer(inputGate, this);
+
+		// Initialize one deserializer per input channel
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+				.getNumberOfInputChannels()];
+		for (int i = 0; i < recordDeserializers.length; i++) {
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
+		}
+	}
+
+	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+		if (isFinished) {
+			return false;
+		}
+
+		while (true) {
+			if (currentRecordDeserializer != null) {
+				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
+
+				if (result.isBufferConsumed()) {
+					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer = null;
+				}
+
+				if (result.isFullRecord()) {
+					return true;
+				}
+			}
+
+			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+			if (bufferOrEvent.isBuffer()) {
+				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
+				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+			} else {
+				// Event received
+				final AbstractEvent event = bufferOrEvent.getEvent();
+
+				if (event instanceof StreamingSuperstep) {
+					barrierBuffer.processSuperstep(bufferOrEvent);
+				} else {
+					if (handleEvent(event)) {
+						if (inputGate.isFinished()) {
+							isFinished = true;
+							return false;
+						} else if (hasReachedEndOfSuperstep()) {
+							return false;
+						} // else: More data is coming...
+					}
+				}
+			}
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
index 025393d..3c8824b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
@@ -19,10 +19,10 @@
 package org.apache.flink.streaming.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
-public class IndexedMutableReader<T extends IOReadableWritable> extends MutableRecordReader<T> {
+public class IndexedMutableReader<T extends IOReadableWritable> extends
+		StreamingMutableRecordReader<T> {
 
 	InputGate reader;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
new file mode 100644
index 0000000..ffa436b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
+		StreamingAbstractRecordReader<T> implements MutableReader<T> {
+
+	public StreamingMutableRecordReader(InputGate inputGate) {
+		super(inputGate);
+	}
+
+	@Override
+	public boolean next(final T target) throws IOException, InterruptedException {
+		return getNextRecord(target);
+	}
+
+	@Override
+	public void clearBuffers() {
+		super.clearBuffers();
+	}
+}