You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:16 UTC
[6/8] flink git commit: [FLINK-1967] Introduce (Event)time in
Streaming
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
index 247fe25..9bf4eb4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -1,41 +1,41 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
import java.util.concurrent.BlockingQueue;
import org.apache.flink.runtime.iterative.concurrent.Broker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
- /**
- * Singleton instance
- */
- private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
- private BlockingQueueBroker() {
- }
-
- /**
- * retrieve singleton instance
- */
- public static Broker<BlockingQueue<StreamRecord>> instance() {
- return INSTANCE;
- }
-}
+
+@SuppressWarnings("rawtypes")
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
+ /**
+ * Singleton instance
+ */
+ private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
+
+ private BlockingQueueBroker() {
+ }
+
+ /**
+ * retrieve singleton instance
+ */
+ public static Broker<BlockingQueue<StreamRecord>> instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
deleted file mode 100644
index 4358810..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
-
-/**
- * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
- * input types.
- */
-public class CoReaderIterator<T1, T2> {
-
- private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
- // source
-
- protected final ReusingDeserializationDelegate<T1> delegate1;
- protected final ReusingDeserializationDelegate<T2> delegate2;
-
- public CoReaderIterator(
- CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
- TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
- this.reader = reader;
- this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
- this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
- }
-
- public int next(T1 target1, T2 target2) throws IOException {
- this.delegate1.setInstance(target1);
- this.delegate2.setInstance(target2);
-
- try {
- return this.reader.getNextRecord(this.delegate1, this.delegate2);
-
- } catch (InterruptedException e) {
- throw new IOException("Reader interrupted.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
deleted file mode 100644
index a7139b6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
-
-/**
- * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
- * types to read records effectively.
- */
-@SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
- AbstractReader implements EventListener<InputGate>, StreamingReader {
-
- private final InputGate bufferReader1;
-
- private final InputGate bufferReader2;
-
- private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
-
- private LinkedList<Integer> processed = new LinkedList<Integer>();
-
- private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
-
- private RecordDeserializer<T1> reader1currentRecordDeserializer;
-
- private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
-
- private RecordDeserializer<T2> reader2currentRecordDeserializer;
-
- // 0 => none, 1 => reader (T1), 2 => reader (T2)
- private int currentReaderIndex;
-
- private boolean hasRequestedPartitions;
-
- protected CoBarrierBuffer barrierBuffer1;
- protected CoBarrierBuffer barrierBuffer2;
-
- public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
- super(new UnionInputGate(inputgate1, inputgate2));
-
- this.bufferReader1 = inputgate1;
- this.bufferReader2 = inputgate2;
-
- this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
- .getNumberOfInputChannels()];
- this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
- .getNumberOfInputChannels()];
-
- for (int i = 0; i < reader1RecordDeserializers.length; i++) {
- reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
- }
-
- for (int i = 0; i < reader2RecordDeserializers.length; i++) {
- reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
- }
-
- inputgate1.registerListener(this);
- inputgate2.registerListener(this);
-
- barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
- barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
-
- barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
- barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
- }
-
- public void requestPartitionsOnce() throws IOException, InterruptedException {
- if (!hasRequestedPartitions) {
- bufferReader1.requestPartitions();
- bufferReader2.requestPartitions();
-
- hasRequestedPartitions = true;
- }
- }
-
- @SuppressWarnings("unchecked")
- protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
-
- requestPartitionsOnce();
-
- while (true) {
- if (currentReaderIndex == 0) {
- if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
- return 0;
- }
-
- currentReaderIndex = getNextReaderIndexBlocking();
-
- }
-
- if (currentReaderIndex == 1) {
- while (true) {
- if (reader1currentRecordDeserializer != null) {
- RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
- .getNextRecord(target1);
-
- if (result.isBufferConsumed()) {
- reader1currentRecordDeserializer.getCurrentBuffer().recycle();
- reader1currentRecordDeserializer = null;
-
- currentReaderIndex = 0;
- }
-
- if (result.isFullRecord()) {
- return 1;
- }
- } else {
-
- final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
-
- if (boe.isBuffer()) {
- reader1currentRecordDeserializer = reader1RecordDeserializers[boe
- .getChannelIndex()];
- reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
- } else if (boe.getEvent() instanceof StreamingSuperstep) {
- barrierBuffer1.processSuperstep(boe);
- currentReaderIndex = 0;
-
- break;
- } else if (handleEvent(boe.getEvent())) {
- currentReaderIndex = 0;
-
- break;
- }
- }
- }
- } else if (currentReaderIndex == 2) {
- while (true) {
- if (reader2currentRecordDeserializer != null) {
- RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
- .getNextRecord(target2);
-
- if (result.isBufferConsumed()) {
- reader2currentRecordDeserializer.getCurrentBuffer().recycle();
- reader2currentRecordDeserializer = null;
-
- currentReaderIndex = 0;
- }
-
- if (result.isFullRecord()) {
- return 2;
- }
- } else {
- final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
-
- if (boe.isBuffer()) {
- reader2currentRecordDeserializer = reader2RecordDeserializers[boe
- .getChannelIndex()];
- reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
- } else if (boe.getEvent() instanceof StreamingSuperstep) {
- barrierBuffer2.processSuperstep(boe);
- currentReaderIndex = 0;
-
- break;
- } else if (handleEvent(boe.getEvent())) {
- currentReaderIndex = 0;
-
- break;
- }
- }
- }
- } else {
- throw new IllegalStateException("Bug: unexpected current reader index.");
- }
- }
- }
-
- protected int getNextReaderIndexBlocking() throws InterruptedException {
-
- Integer nextIndex = 0;
-
- while (processed.contains(nextIndex = availableRecordReaders.take())) {
- processed.remove(nextIndex);
- }
-
- if (nextIndex == 1) {
- if (barrierBuffer1.isAllBlocked()) {
- availableRecordReaders.addFirst(1);
- processed.add(2);
- return 2;
- } else {
- return 1;
- }
- } else {
- if (barrierBuffer2.isAllBlocked()) {
- availableRecordReaders.addFirst(2);
- processed.add(1);
- return 1;
- } else {
- return 2;
- }
-
- }
-
- }
-
- // ------------------------------------------------------------------------
- // Data availability notifications
- // ------------------------------------------------------------------------
-
- @Override
- public void onEvent(InputGate bufferReader) {
- addToAvailable(bufferReader);
- }
-
- protected void addToAvailable(InputGate bufferReader) {
- if (bufferReader == bufferReader1) {
- availableRecordReaders.add(1);
- } else if (bufferReader == bufferReader2) {
- availableRecordReaders.add(2);
- }
- }
-
- public void clearBuffers() {
- for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- }
-
- @Override
- public void setReporter(AccumulatorRegistry.Reporter reporter) {
- for (AdaptiveSpanningRecordDeserializer serializer : reader1RecordDeserializers) {
- serializer.setReporter(reporter);
- }
- for (AdaptiveSpanningRecordDeserializer serializer : reader2RecordDeserializers) {
- serializer.setReporter(reporter);
- }
- }
-
- private class CoBarrierBuffer extends BarrierBuffer {
-
- private CoBarrierBuffer otherBuffer;
-
- public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
- super(inputGate, reader);
- }
-
- public void setOtherBarrierBuffer(CoBarrierBuffer other) {
- this.otherBuffer = other;
- }
-
- @Override
- protected void actOnAllBlocked() {
- if (otherBuffer.isAllBlocked()) {
- super.actOnAllBlocked();
- otherBuffer.releaseBlocks();
- }
- }
-
- }
-
- public void cleanup() throws IOException {
- try {
- barrierBuffer1.cleanup();
- } finally {
- barrierBuffer2.cleanup();
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
new file mode 100644
index 0000000..2f9d1d6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
+
+ private OutputSelectorWrapper<OUT> outputSelectorWrapper;
+
+ private List<Output<OUT>> allOutputs;
+
+ public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
+ this.outputSelectorWrapper = outputSelectorWrapper;
+ allOutputs = new ArrayList<Output<OUT>>();
+ }
+
+ public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
+ outputSelectorWrapper.addCollector(output, edge);
+ allOutputs.add((Output) output);
+ }
+
+ @Override
+ public void collect(StreamRecord<OUT> record) {
+ for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
+ output.collect(record);
+ }
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ for (Output<OUT> output : allOutputs) {
+ output.emitWatermark(mark);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
deleted file mode 100644
index 7f2a9c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class IndexedMutableReader<T extends IOReadableWritable> extends
- StreamingMutableRecordReader<T> {
-
- InputGate reader;
-
- public IndexedMutableReader(InputGate reader) {
- super(reader);
- this.reader = reader;
- }
-
- public int getNumberOfInputChannels() {
- return reader.getNumberOfInputChannels();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
deleted file mode 100644
index 2050e27..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-
-public class IndexedReaderIterator<T> extends ReaderIterator<T> {
-
- public IndexedReaderIterator(
- IndexedMutableReader<DeserializationDelegate<T>> reader,
- TypeSerializer<T> serializer) {
-
- super(reader, serializer);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
deleted file mode 100644
index 7883251..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.Collection;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-public class InputGateFactory {
-
- public static InputGate createInputGate(Collection<InputGate> inputGates) {
- return createInputGate(inputGates.toArray(new InputGate[inputGates.size()]));
- }
-
- public static InputGate createInputGate(InputGate[] inputGates) {
- if (inputGates.length <= 0) {
- throw new RuntimeException("No such input gate.");
- }
-
- if (inputGates.length < 2) {
- return inputGates[0];
- } else {
- return new UnionInputGate(inputGates);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
new file mode 100644
index 0000000..01e16fb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+
+/**
+ * Utility for dealing with input gates. This will either just return
+ * the single {@link InputGate} that was passed in or create a {@link UnionInputGate} if several
+ * {@link InputGate input gates} are given.
+ */
+public class InputGateUtil {
+
+ public static InputGate createInputGate(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2) {
+ List<InputGate> gates = new ArrayList<InputGate>(inputGates1.size() + inputGates2.size());
+ gates.addAll(inputGates1);
+ gates.addAll(inputGates2);
+ return createInputGate(gates.toArray(new InputGate[gates.size()]));
+ }
+
+ public static InputGate createInputGate(InputGate[] inputGates) {
+ if (inputGates.length <= 0) {
+ throw new RuntimeException("No such input gate.");
+ }
+
+ if (inputGates.length < 2) {
+ return inputGates[0];
+ } else {
+ return new UnionInputGate(inputGates);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
new file mode 100644
index 0000000..e9cbb7d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
+ */
+public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
+
+ private RecordWriter<SerializationDelegate> recordWriter;
+ private SerializationDelegate serializationDelegate;
+
+ @SuppressWarnings("unchecked")
+ public RecordWriterOutput(
+ RecordWriter<SerializationDelegate> recordWriter,
+ TypeSerializer<OUT> outSerializer,
+ boolean enableWatermarkMultiplexing) {
+ Preconditions.checkNotNull(recordWriter);
+
+ this.recordWriter = recordWriter;
+
+ StreamRecordSerializer<OUT> outRecordSerializer;
+ if (enableWatermarkMultiplexing) {
+ outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
+ } else {
+ outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer);
+ }
+
+ if (outSerializer != null) {
+ serializationDelegate = new SerializationDelegate(outRecordSerializer);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void collect(StreamRecord<OUT> record) {
+ serializationDelegate.setInstance(record);
+
+ try {
+ recordWriter.emit(serializationDelegate);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Emit failed: {}", e);
+ }
+ throw new RuntimeException("Element emission failed.", e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void emitWatermark(Watermark mark) {
+ serializationDelegate.setInstance(mark);
+ try {
+ recordWriter.broadcastEmit(serializationDelegate);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Watermark emit failed: {}", e);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (recordWriter instanceof StreamRecordWriter) {
+ ((StreamRecordWriter) recordWriter).close();
+ } else {
+ try {
+ recordWriter.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void clearBuffers() {
+ recordWriter.clearBuffers();
+ }
+
+ public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
+ recordWriter.broadcastEvent(barrier);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
new file mode 100644
index 0000000..e665710
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.
+ *
+ * @param <IN> The type of the record that can be read with this record reader.
+ */
+public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBase, StreamingReader {
+
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
+
+ private final RecordDeserializer<DeserializationDelegate>[] recordDeserializers;
+
+ private RecordDeserializer<DeserializationDelegate> currentRecordDeserializer;
+
+ // We need to keep track of the channel from which a buffer came, so that we can
+ // appropriately map the watermarks to input channels
+ int currentChannel = -1;
+
+ private boolean isFinished;
+
+ private final BarrierBuffer barrierBuffer;
+
+ private long[] watermarks;
+ private long lastEmittedWatermark;
+
+ private DeserializationDelegate deserializationDelegate;
+
+ @SuppressWarnings("unchecked")
+ public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
+ super(InputGateUtil.createInputGate(inputGates));
+
+ barrierBuffer = new BarrierBuffer(inputGate, this);
+
+ StreamRecordSerializer<IN> inputRecordSerializer;
+ if (enableWatermarkMultiplexing) {
+ inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+ } else {
+ inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
+ }
+ this.deserializationDelegate = new NonReusingDeserializationDelegate(inputRecordSerializer);
+
+ // Initialize one deserializer per input channel
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+ for (int i = 0; i < recordDeserializers.length; i++) {
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
+ }
+
+ watermarks = new long[inputGate.getNumberOfInputChannels()];
+ for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
+ watermarks[i] = Long.MIN_VALUE;
+ }
+ lastEmittedWatermark = Long.MIN_VALUE;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
+ if (isFinished) {
+ return false;
+ }
+
+ while (true) {
+ if (currentRecordDeserializer != null) {
+ DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
+
+ if (result.isBufferConsumed()) {
+ currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer = null;
+ }
+
+ if (result.isFullRecord()) {
+ Object recordOrWatermark = deserializationDelegate.getInstance();
+
+ if (recordOrWatermark instanceof Watermark) {
+ Watermark mark = (Watermark) recordOrWatermark;
+ long watermarkMillis = mark.getTimestamp();
+ if (watermarkMillis > watermarks[currentChannel]) {
+ watermarks[currentChannel] = watermarkMillis;
+ long newMinWatermark = Long.MAX_VALUE;
+ for (long watermark : watermarks) {
+ if (watermark < newMinWatermark) {
+ newMinWatermark = watermark;
+ }
+ }
+ if (newMinWatermark > lastEmittedWatermark) {
+ lastEmittedWatermark = newMinWatermark;
+ streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
+ }
+ }
+ continue;
+ } else {
+ // now we can do the actual processing
+ StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
+ StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
+ if (ctx != null) {
+ ctx.setNextInput(record);
+ }
+ streamOperator.processElement(record);
+ return true;
+ }
+ }
+ }
+
+ final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+ if (bufferOrEvent.isBuffer()) {
+ currentChannel = bufferOrEvent.getChannelIndex();
+ currentRecordDeserializer = recordDeserializers[currentChannel];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+ } else {
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+
+ if (event instanceof CheckpointBarrier) {
+ barrierBuffer.processBarrier(bufferOrEvent);
+ } else {
+ if (handleEvent(event)) {
+ if (inputGate.isFinished()) {
+ if (!barrierBuffer.isEmpty()) {
+ throw new RuntimeException("BarrierBuffer should be empty at this point");
+ }
+ isFinished = true;
+ return false;
+ } else if (hasReachedEndOfSuperstep()) {
+ return false;
+ } // else: More data is coming...
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ deserializer.setReporter(reporter);
+ }
+ }
+
+ public void clearBuffers() {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ Buffer buffer = deserializer.getCurrentBuffer();
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycle();
+ }
+ }
+ }
+
+ public void cleanup() throws IOException {
+ barrierBuffer.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index c212346..abae9a4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -61,6 +61,14 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
}
}
+ @Override
+ public void broadcastEmit(T record) throws IOException, InterruptedException {
+ super.broadcastEmit(record);
+ if (flushAlways) {
+ flush();
+ }
+ }
+
public void close() {
try {
if (outputFlusher != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
new file mode 100644
index 0000000..1fe98bb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
+ * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
+ *
+ * @param <IN1> The type of the records that arrive on the first input
+ * @param <IN2> The type of the records that arrive on the second input
+ */
+public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements ReaderBase, StreamingReader {
+
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+
+ private final RecordDeserializer[] recordDeserializers;
+
+ private RecordDeserializer currentRecordDeserializer;
+
+ // We need to keep track of the channel from which a buffer came, so that we can
+ // appropriately map the watermarks to input channels
+ int currentChannel = -1;
+
+ private boolean isFinished;
+
+ private final BarrierBuffer barrierBuffer;
+
+ private long[] watermarks1;
+ private long lastEmittedWatermark1;
+
+ private long[] watermarks2;
+ private long lastEmittedWatermark2;
+
+ private int numInputChannels1;
+ private int numInputChannels2;
+
+ private DeserializationDelegate deserializationDelegate1;
+ private DeserializationDelegate deserializationDelegate2;
+
+ @SuppressWarnings("unchecked")
+ public StreamTwoInputProcessor(
+ Collection<InputGate> inputGates1,
+ Collection<InputGate> inputGates2,
+ TypeSerializer<IN1> inputSerializer1,
+ TypeSerializer<IN2> inputSerializer2,
+ boolean enableWatermarkMultiplexing) {
+ super(InputGateUtil.createInputGate(inputGates1, inputGates2));
+
+ barrierBuffer = new BarrierBuffer(inputGate, this);
+
+ StreamRecordSerializer<IN1> inputRecordSerializer1;
+ if (enableWatermarkMultiplexing) {
+ inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+ } else {
+ inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1);
+ }
+ this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1);
+
+ StreamRecordSerializer<IN2> inputRecordSerializer2;
+ if (enableWatermarkMultiplexing) {
+ inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+ } else {
+ inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2);
+ }
+ this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2);
+
+ // Initialize one deserializer per input channel
+ this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+ .getNumberOfInputChannels()];
+ for (int i = 0; i < recordDeserializers.length; i++) {
+ recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
+ }
+
+ // determine which unioned channels belong to input 1 and which belong to input 2
+ numInputChannels1 = 0;
+ for (InputGate gate: inputGates1) {
+ numInputChannels1 += gate.getNumberOfInputChannels();
+ }
+ numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+
+ watermarks1 = new long[numInputChannels1];
+ for (int i = 0; i < numInputChannels1; i++) {
+ watermarks1[i] = Long.MIN_VALUE;
+ }
+ lastEmittedWatermark1 = Long.MIN_VALUE;
+
+ watermarks2 = new long[numInputChannels2];
+ for (int i = 0; i < numInputChannels2; i++) {
+ watermarks2[i] = Long.MIN_VALUE;
+ }
+ lastEmittedWatermark2 = Long.MIN_VALUE;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception {
+ if (isFinished) {
+ return false;
+ }
+
+ while (true) {
+ if (currentRecordDeserializer != null) {
+ DeserializationResult result;
+ if (currentChannel < numInputChannels1) {
+ result = currentRecordDeserializer.getNextRecord(deserializationDelegate1);
+ } else {
+ result = currentRecordDeserializer.getNextRecord(deserializationDelegate2);
+ }
+
+ if (result.isBufferConsumed()) {
+ currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer = null;
+ }
+
+ if (result.isFullRecord()) {
+ if (currentChannel < numInputChannels1) {
+ Object recordOrWatermark = deserializationDelegate1.getInstance();
+ if (recordOrWatermark instanceof Watermark) {
+ handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+ continue;
+ } else {
+ streamOperator.processElement1((StreamRecord<IN1>) deserializationDelegate1.getInstance());
+ return true;
+
+ }
+ } else {
+ Object recordOrWatermark = deserializationDelegate2.getInstance();
+ if (recordOrWatermark instanceof Watermark) {
+ handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+ continue;
+ } else {
+ streamOperator.processElement2((StreamRecord<IN2>) deserializationDelegate2.getInstance());
+ return true;
+ }
+ }
+ }
+ }
+
+ final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+ if (bufferOrEvent.isBuffer()) {
+ currentChannel = bufferOrEvent.getChannelIndex();
+ currentRecordDeserializer = recordDeserializers[currentChannel];
+ currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+
+ } else {
+ // Event received
+ final AbstractEvent event = bufferOrEvent.getEvent();
+
+ if (event instanceof CheckpointBarrier) {
+ barrierBuffer.processBarrier(bufferOrEvent);
+ } else {
+ if (handleEvent(event)) {
+ if (inputGate.isFinished()) {
+ if (!barrierBuffer.isEmpty()) {
+ throw new RuntimeException("BarrierBuffer should be empty at this point");
+ }
+ isFinished = true;
+ return false;
+ } else if (hasReachedEndOfSuperstep()) {
+ return false;
+ } // else: More data is coming...
+ }
+ }
+ }
+ }
+ }
+
+ private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex) throws Exception {
+ if (channelIndex < numInputChannels1) {
+ long watermarkMillis = mark.getTimestamp();
+ if (watermarkMillis > watermarks1[channelIndex]) {
+ watermarks1[channelIndex] = watermarkMillis;
+ long newMinWatermark = Long.MAX_VALUE;
+ for (long aWatermarks1 : watermarks1) {
+ if (aWatermarks1 < newMinWatermark) {
+ newMinWatermark = aWatermarks1;
+ }
+ }
+ if (newMinWatermark > lastEmittedWatermark1) {
+ lastEmittedWatermark1 = newMinWatermark;
+ operator.processWatermark1(new Watermark(lastEmittedWatermark1));
+ }
+ }
+ } else {
+ channelIndex = channelIndex - numInputChannels1;
+ long watermarkMillis = mark.getTimestamp();
+ if (watermarkMillis > watermarks2[channelIndex]) {
+ watermarks2[channelIndex] = watermarkMillis;
+ long newMinWatermark = Long.MAX_VALUE;
+ for (long aWatermarks2 : watermarks2) {
+ if (aWatermarks2 < newMinWatermark) {
+ newMinWatermark = aWatermarks2;
+ }
+ }
+ if (newMinWatermark > lastEmittedWatermark2) {
+ lastEmittedWatermark2 = newMinWatermark;
+ operator.processWatermark2(new Watermark(lastEmittedWatermark2));
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void setReporter(AccumulatorRegistry.Reporter reporter) {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ deserializer.setReporter(reporter);
+ }
+ }
+
+ public void clearBuffers() {
+ for (RecordDeserializer<?> deserializer : recordDeserializers) {
+ Buffer buffer = deserializer.getCurrentBuffer();
+ if (buffer != null && !buffer.isRecycled()) {
+ buffer.recycle();
+ }
+ }
+ }
+
+ public void cleanup() throws IOException {
+ barrierBuffer.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
deleted file mode 100644
index 44f9a86..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- *
- * @param <T>
- * The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends
- AbstractReader implements ReaderBase, StreamingReader {
-
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
- private final RecordDeserializer<T>[] recordDeserializers;
-
- private RecordDeserializer<T> currentRecordDeserializer;
-
- private boolean isFinished;
-
- private final BarrierBuffer barrierBuffer;
-
-
- @SuppressWarnings("unchecked")
- protected StreamingAbstractRecordReader(InputGate inputGate) {
- super(inputGate);
- barrierBuffer = new BarrierBuffer(inputGate, this);
-
- // Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
- .getNumberOfInputChannels()];
- for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
- }
- }
-
- protected boolean getNextRecord(T target) throws IOException, InterruptedException {
- if (isFinished) {
- return false;
- }
-
- while (true) {
- if (currentRecordDeserializer != null) {
- DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
-
- if (result.isBufferConsumed()) {
- Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
- currentBuffer.recycle();
- currentRecordDeserializer = null;
- }
-
- if (result.isFullRecord()) {
- return true;
- }
- }
-
- final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
- if (bufferOrEvent.isBuffer()) {
- currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
-
- if (event instanceof StreamingSuperstep) {
- barrierBuffer.processSuperstep(bufferOrEvent);
- } else {
- if (handleEvent(event)) {
- if (inputGate.isFinished()) {
- if (!barrierBuffer.isEmpty()) {
- throw new RuntimeException(
- "BarrierBuffer should be empty at this point");
- }
- isFinished = true;
- return false;
- } else if (hasReachedEndOfSuperstep()) {
- return false;
- } // else: More data is coming...
- }
- }
- }
- }
- }
-
- public void clearBuffers() {
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- }
-
- public void cleanup() throws IOException {
- barrierBuffer.cleanup();
- }
-
- @Override
- public void setReporter(AccumulatorRegistry.Reporter reporter) {
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- deserializer.setReporter(reporter);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
deleted file mode 100644
index 1356af5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
- StreamingAbstractRecordReader<T> implements MutableReader<T> {
-
- public StreamingMutableRecordReader(InputGate inputGate) {
- super(inputGate);
- }
-
- @Override
- public boolean next(final T target) throws IOException, InterruptedException {
- return getNextRecord(target);
- }
-
- @Override
- public void clearBuffers() {
- super.clearBuffers();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 75867cd..6c40c03 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -44,10 +44,14 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
}
@Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
-
- K key = record.getInstance().getKey(keySelector);
+ public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
+
+ K key = null;
+ try {
+ key = keySelector.getKey(record.getInstance().getValue());
+ } catch (Exception e) {
+ throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
+ }
returnArray[0] = partitioner.partition(key,
numberOfOutputChannels);
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
index 08c431b..7026d45 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
@@ -42,8 +42,13 @@ public class FieldsPartitioner<T> extends StreamPartitioner<T> {
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
- returnArray[0] = Math.abs(record.getInstance().getKey(keySelector).hashCode()
- % numberOfOutputChannels);
+ Object key;
+ try {
+ key = keySelector.getKey(record.getInstance().getValue());
+ } catch (Exception e) {
+ throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
+ }
+ returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
return returnArray;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
new file mode 100644
index 0000000..715f0d2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal
+ * {@link TypeSerializer}, instead, this is only used at the
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
+ * {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This serializer
+ * can handle both of them, therefore it returns {@link Object} the result has
+ * to be cast to the correct type.
+ *
+ * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
+ */
+public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> {
+
+ private final long IS_WATERMARK = Long.MIN_VALUE;
+
+ private static final long serialVersionUID = 1L;
+
+ public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
+ super(serializer);
+ if (serializer instanceof MultiplexingStreamRecordSerializer) {
+ throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object copy(Object from) {
+ // we can reuse the timestamp since Instant is immutable
+ if (from instanceof StreamRecord) {
+ StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+ return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+ } else if (from instanceof Watermark) {
+ // is immutable
+ return from;
+ } else {
+ throw new RuntimeException("Cannot copy " + from);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object copy(Object from, Object reuse) {
+ if (from instanceof StreamRecord && reuse instanceof StreamRecord) {
+ StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+ StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+
+ reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), fromRecord.getTimestamp());
+ return reuse;
+ } else if (from instanceof Watermark) {
+ // is immutable
+ return from;
+ } else {
+ throw new RuntimeException("Cannot copy " + from);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void serialize(Object value, DataOutputView target) throws IOException {
+ if (value instanceof StreamRecord) {
+ StreamRecord<T> record = (StreamRecord<T>) value;
+ target.writeLong(record.getTimestamp());
+ typeSerializer.serialize(record.getValue(), target);
+ } else if (value instanceof Watermark) {
+ target.writeLong(IS_WATERMARK);
+ target.writeLong(((Watermark) value).getTimestamp());
+ }
+ }
+
+ @Override
+ public Object deserialize(DataInputView source) throws IOException {
+ long millis = source.readLong();
+
+ if (millis == IS_WATERMARK) {
+ return new Watermark(source.readLong());
+ } else {
+ T element = typeSerializer.deserialize(source);
+ return new StreamRecord<T>(element, millis);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object deserialize(Object reuse, DataInputView source) throws IOException {
+ long millis = source.readLong();
+
+ if (millis == IS_WATERMARK) {
+ return new Watermark(source.readLong());
+
+ } else {
+ StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+ T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
+ reuseRecord.replace(element, millis);
+ return reuse;
+ }
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ long millis = source.readLong();
+ target.writeLong(millis);
+
+ if (millis == IS_WATERMARK) {
+ target.writeLong(source.readLong());
+ } else {
+ typeSerializer.copy(source, target);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 66a64b3..aff030e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -17,87 +17,106 @@
package org.apache.flink.streaming.runtime.streamrecord;
-import java.io.Serializable;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-
/**
- * Object for wrapping a tuple or other object with ID used for sending records
- * between streaming task in Apache Flink stream processing.
+ * One value in a data stream. This stores the value and the associated timestamp.
*/
-public class StreamRecord<T> implements Serializable {
- private static final long serialVersionUID = 1L;
+public class StreamRecord<T> {
- private T streamObject;
- public boolean isTuple;
+ // We store it as Object so that we can reuse a StreamElement for emitting
+ // elements of a different type while still reusing the timestamp.
+ private Object value;
+ private long timestamp;
/**
- * Creates an empty StreamRecord
+ * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
+ * result of {@code new Instant(0)}.
*/
- public StreamRecord() {
+ public StreamRecord(T value) {
+ this(value, Long.MIN_VALUE + 1);
+ // be careful to set it to MIN_VALUE + 1, because MIN_VALUE is reserved as the
+ // special tag to signify that a transmitted element is a Watermark in StreamRecordSerializer
}
/**
- * Gets the wrapped object from the StreamRecord
- *
- * @return The object wrapped
+ * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
+ * given timestamp.
+ *
+ * @param value The value to wrap in this {@link StreamRecord}
+ * @param timestamp The timestamp in milliseconds
*/
- public T getObject() {
- return streamObject;
+ public StreamRecord(T value, long timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
}
/**
- * Gets the field of the contained object at the given position. If a tuple
- * is wrapped then the getField method is invoked. If the StreamRecord
- * contains and object of Basic types only position 0 could be returned.
- *
- * @param pos
- * Position of the field to get.
- * @return Returns the object contained in the position.
+ * Returns the value wrapped in this stream value.
*/
- public Object getField(int pos) {
- if (isTuple) {
- return ((Tuple) streamObject).getField(pos);
- } else {
- if (pos == 0) {
- return streamObject;
- } else {
- throw new IndexOutOfBoundsException();
- }
- }
+ @SuppressWarnings("unchecked")
+ public T getValue() {
+ return (T) value;
}
/**
- * Extracts key for the stored object using the keySelector provided.
- *
- * @param keySelector
- * KeySelector for extracting the key
- * @return The extracted key
+ * Returns the timestamp associated with this stream value in milliseconds.
*/
- public <R> R getKey(KeySelector<T, R> keySelector) {
- try {
- return keySelector.getKey(streamObject);
- } catch (Exception e) {
- throw new RuntimeException("Failed to extract key: " + e.getMessage());
- }
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Replace the currently stored value by the given new value. This returns a StreamElement
+ * with the generic type parameter that matches the new value while keeping the old
+ * timestamp.
+ *
+ * @param element Element to set in this stream value
+ * @return Returns the StreamElement with replaced value
+ */
+ @SuppressWarnings("unchecked")
+ public <X> StreamRecord<X> replace(X element) {
+ this.value = element;
+ return (StreamRecord<X>) this;
}
/**
- * Sets the object stored
- *
- * @param object
- * Object to set
- * @return Returns the StreamRecord object
+ * Replace the currently stored value by the given new value and the currently stored
+ * timestamp with the new timestamp. This returns a StreamElement with the generic type
+ * parameter that matches the new value.
+ *
+ * @param value The new value to wrap in this {@link StreamRecord}
+ * @param timestamp The new timestamp in milliseconds
+ * @return Returns the StreamElement with replaced value
*/
- public StreamRecord<T> setObject(T object) {
- this.streamObject = object;
- return this;
+ @SuppressWarnings("unchecked")
+ public <X> StreamRecord<X> replace(X value, long timestamp) {
+ this.timestamp = timestamp;
+ this.value = value;
+ return (StreamRecord<X>) this;
}
@Override
- public String toString() {
- return streamObject.toString();
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StreamRecord that = (StreamRecord) o;
+
+ return value.equals(that.value) && timestamp == that.timestamp;
}
+ @Override
+ public int hashCode() {
+ int result = value != null ? value.hashCode() : 0;
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Record{" + value + "; " + timestamp + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 4499499..b05eb36 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -20,26 +20,35 @@ package org.apache.flink.streaming.runtime.streamrecord;
import java.io.IOException;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
+/**
+ * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
+ * the element.
+ *
+ * <p>
+ * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
+ * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
+ * stream with {@link StreamRecord StreamRecords}.
+ *
+ * @see MultiplexingStreamRecordSerializer
+ *
+ * @param <T> The type of value in the {@link StreamRecord}
+ */
+public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
private static final long serialVersionUID = 1L;
- private final TypeSerializer<T> typeSerializer;
- private final boolean isTuple;
+ protected final TypeSerializer<T> typeSerializer;
- public StreamRecordSerializer(TypeInformation<T> typeInfo, ExecutionConfig executionConfig) {
- this.typeSerializer = typeInfo.createSerializer(executionConfig);
- this.isTuple = typeInfo.isTupleType();
- }
-
- public TypeSerializer<T> getObjectSerializer() {
- return typeSerializer;
+ public StreamRecordSerializer(TypeSerializer<T> serializer) {
+ if (serializer instanceof StreamRecordSerializer) {
+ throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+ }
+ this.typeSerializer = Preconditions.checkNotNull(serializer);
}
@Override
@@ -48,34 +57,34 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
}
@Override
- public StreamRecordSerializer<T> duplicate() {
+ @SuppressWarnings("unchecked")
+ public TypeSerializer duplicate() {
return this;
}
@Override
- public StreamRecord<T> createInstance() {
+ public Object createInstance() {
try {
- StreamRecord<T> t = new StreamRecord<T>();
- t.isTuple = isTuple;
- t.setObject(typeSerializer.createInstance());
- return t;
+ return new StreamRecord<T>(typeSerializer.createInstance());
} catch (Exception e) {
throw new RuntimeException("Cannot instantiate StreamRecord.", e);
}
}
@Override
- public StreamRecord<T> copy(StreamRecord<T> from) {
- StreamRecord<T> rec = new StreamRecord<T>();
- rec.isTuple = from.isTuple;
- rec.setObject(typeSerializer.copy(from.getObject()));
- return rec;
+ @SuppressWarnings("unchecked")
+ public Object copy(Object from) {
+ StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+ return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
}
@Override
- public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
- reuse.isTuple = from.isTuple;
- reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject()));
+ @SuppressWarnings("unchecked")
+ public Object copy(Object from, Object reuse) {
+ StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+ StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+
+ reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0);
return reuse;
}
@@ -85,26 +94,29 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
}
@Override
- public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
- typeSerializer.serialize(value.getObject(), target);
+ @SuppressWarnings("unchecked")
+ public void serialize(Object value, DataOutputView target) throws IOException {
+ StreamRecord<T> record = (StreamRecord<T>) value;
+ typeSerializer.serialize(record.getValue(), target);
}
@Override
- public StreamRecord<T> deserialize(DataInputView source) throws IOException {
- StreamRecord<T> record = new StreamRecord<T>();
- record.isTuple = this.isTuple;
- record.setObject(typeSerializer.deserialize(source));
- return record;
+ public Object deserialize(DataInputView source) throws IOException {
+ T element = typeSerializer.deserialize(source);
+ return new StreamRecord<T>(element, 0);
}
@Override
- public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
- reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
+ @SuppressWarnings("unchecked")
+ public Object deserialize(Object reuse, DataInputView source) throws IOException {
+ StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+ T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
+ reuseRecord.replace(element, 0);
return reuse;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- // Needs to be implemented
+ typeSerializer.copy(source, target);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
new file mode 100644
index 0000000..d94b5b4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+/**
+ * Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The
+ * barriers are emitted by the sources when instructed to do so by the JobManager. When
+ * operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing
+ * of further elements on this input until all inputs received the checkpoint barrier
+ * corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for
+ * a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to
+ * downstream operators.
+ *
+ * <p>
+ * The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier}
+ * for a checkpoint with a higher id it is to discard all barriers that it received from previous
+ * checkpoints and unblock all other inputs.
+ */
+public class CheckpointBarrier extends TaskEvent {
+
+ protected long id;
+ protected long timestamp;
+
+ public CheckpointBarrier() {}
+
+ public CheckpointBarrier(long id, long timestamp) {
+ this.id = id;
+ this.timestamp = timestamp;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getTimestamp() {
+ return id;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeLong(id);
+ out.writeLong(timestamp);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ id = in.readLong();
+ timestamp = in.readLong();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof CheckpointBarrier)) {
+ return false;
+ }
+ else {
+ CheckpointBarrier that = (CheckpointBarrier) other;
+ return that.id == this.id && that.timestamp == this.timestamp;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+ }
+}