You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:04 UTC
[04/14] flink git commit: [FLINK-1638] [streaming] Seperated
AbstractRecordReader for streaming and batch
[FLINK-1638] [streaming] Seperated AbstractRecordReader for streaming and batch
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9a39926
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9a39926
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9a39926
Branch: refs/heads/master
Commit: c9a399268309768738e65af3c52525560b85cd0c
Parents: cf49ebb
Author: Gyula Fora <gy...@apache.org>
Authored: Thu Mar 5 19:55:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100
----------------------------------------------------------------------
.../api/reader/AbstractRecordReader.java | 55 +++------
.../reader/StreamingAbstractRecordReader.java | 122 +++++++++++++++++++
.../streaming/io/IndexedMutableReader.java | 4 +-
.../io/StreamingMutableRecordReader.java | 44 +++++++
4 files changed, 186 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 920792c..e70b6ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -18,34 +18,24 @@
package org.apache.flink.runtime.io.network.api.reader;
-import java.io.IOException;
-
-
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.StreamingSuperstep;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
/**
* A record-oriented reader.
* <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- *
- * @param <T>
- * The type of the record that can be read with this record reader.
+ * This abstract base class is used by both the mutable and immutable record readers.
+ *
+ * @param <T> The type of the record that can be read with this record reader.
*/
-abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
- ReaderBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordReader.class);
+abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase {
private final RecordDeserializer<T>[] recordDeserializers;
@@ -53,15 +43,11 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
private boolean isFinished;
- private final BarrierBuffer barrierBuffer;
-
protected AbstractRecordReader(InputGate inputGate) {
super(inputGate);
- barrierBuffer = new BarrierBuffer(inputGate, this);
// Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
- .getNumberOfInputChannels()];
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
}
@@ -86,27 +72,22 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
}
}
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+ final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
if (bufferOrEvent.isBuffer()) {
currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
-
- if (event instanceof StreamingSuperstep) {
- barrierBuffer.processSuperstep(bufferOrEvent);
- } else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
+ }
+ else if (handleEvent(bufferOrEvent.getEvent())) {
+ if (inputGate.isFinished()) {
+ isFinished = true;
+
+ return false;
}
+ else if (hasReachedEndOfSuperstep()) {
+
+ return false;
+ } // else: More data is coming...
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
new file mode 100644
index 0000000..ea2d7a6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/StreamingAbstractRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.StreamingSuperstep;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A record-oriented reader.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * readers.
+ *
+ * @param <T>
+ * The type of the record that can be read with this record reader.
+ */
+public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
+ ReaderBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
+
+ private final RecordDeserializer<T>[] recordDeserializers;
+
+ private RecordDeserializer<T> currentRecordDeserializer;
+
+ private boolean isFinished;
+
+ private final BarrierBuffer barrierBuffer;
+
+ protected StreamingAbstractRecordReader(InputGate inputGate) {
+ super(inputGate);
+ barrierBuffer = new BarrierBuffer(inputGate, this);
+
+ // Initialize one deserializer per input channel
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+ .getNumberOfInputChannels()];
+ for (int i = 0; i < recordDeserializers.length; i++) {
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
+ }
+ }
+
+ protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+ if (isFinished) {
+ return false;
+ }
+
+ while (true) {
+ if (currentRecordDeserializer != null) {
+ DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
+
+ if (result.isBufferConsumed()) {
+ currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer = null;
+ }
+
+ if (result.isFullRecord()) {
+ return true;
+ }
+ }
+
+ final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+ if (bufferOrEvent.isBuffer()) {
+ currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+ } else {
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+
+ if (event instanceof StreamingSuperstep) {
+ barrierBuffer.processSuperstep(bufferOrEvent);
+ } else {
+ if (handleEvent(event)) {
+ if (inputGate.isFinished()) {
+ isFinished = true;
+ return false;
+ } else if (hasReachedEndOfSuperstep()) {
+ return false;
+ } // else: More data is coming...
+ }
+ }
+ }
+ }
+ }
+
+ public void clearBuffers() {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ Buffer buffer = deserializer.getCurrentBuffer();
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycle();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
index 025393d..3c8824b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
@@ -19,10 +19,10 @@
package org.apache.flink.streaming.io;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-public class IndexedMutableReader<T extends IOReadableWritable> extends MutableRecordReader<T> {
+public class IndexedMutableReader<T extends IOReadableWritable> extends
+ StreamingMutableRecordReader<T> {
InputGate reader;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9a39926/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
new file mode 100644
index 0000000..ffa436b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingMutableRecordReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.reader.StreamingAbstractRecordReader;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
+ StreamingAbstractRecordReader<T> implements MutableReader<T> {
+
+ public StreamingMutableRecordReader(InputGate inputGate) {
+ super(inputGate);
+ }
+
+ @Override
+ public boolean next(final T target) throws IOException, InterruptedException {
+ return getNextRecord(target);
+ }
+
+ @Override
+ public void clearBuffers() {
+ super.clearBuffers();
+ }
+}