You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:49 UTC
[33/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
deleted file mode 100644
index 2e415f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.windows;
-
-/**
- * A {@code Window} is a grouping of elements into finite buckets. Windows have a maximum timestamp
- * which means that, at some point, all elements that go into one window will have arrived.
- *
- * <p>
- * Subclasses should implement {@code equals()} and {@code hashCode()} so that logically
- * same windows are treated the same.
- */
-public abstract class Window {
-
- public abstract long maxTimestamp();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
deleted file mode 100644
index 863f7ac..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
- * all inputs have received the barrier for a given checkpoint.
- *
- * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
- * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
- * the blocks are released.</p>
- */
-public class BarrierBuffer implements CheckpointBarrierHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
- /** The gate that the buffer draws its input from */
- private final InputGate inputGate;
-
- /** Flags that indicate whether a channel is currently blocked/buffered */
- private final boolean[] blockedChannels;
-
- /** The total number of channels that this buffer handles data from */
- private final int totalNumberOfInputChannels;
-
- /** To utility to write blocked data to a file channel */
- private final BufferSpiller bufferSpiller;
-
- /** The pending blocked buffer/event sequences. Must be consumed before requesting
- * further data from the input gate. */
- private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
-
- /** The sequence of buffers/events that has been unblocked and must now be consumed
- * before requesting further data from the input gate */
- private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
-
- /** Handler that receives the checkpoint notifications */
- private EventListener<CheckpointBarrier> checkpointHandler;
-
- /** The ID of the checkpoint for which we expect barriers */
- private long currentCheckpointId = -1L;
-
- /** The number of received barriers (= number of blocked/buffered channels) */
- private int numBarriersReceived;
-
- /** The number of already closed channels */
- private int numClosedChannels;
-
- /** Flag to indicate whether we have drawn all available input */
- private boolean endOfStream;
-
- /**
- *
- * @param inputGate The input gate to draw the buffers and events from.
- * @param ioManager The I/O manager that gives access to the temp directories.
- *
- * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
- */
- public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
- this.inputGate = inputGate;
- this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-
- this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
- this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
- }
-
- // ------------------------------------------------------------------------
- // Buffer and barrier handling
- // ------------------------------------------------------------------------
-
- @Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
- while (true) {
- // process buffered BufferOrEvents before grabbing new ones
- BufferOrEvent next;
- if (currentBuffered == null) {
- next = inputGate.getNextBufferOrEvent();
- }
- else {
- next = currentBuffered.getNext();
- if (next == null) {
- completeBufferedSequence();
- return getNextNonBlocked();
- }
- }
-
- if (next != null) {
- if (isBlocked(next.getChannelIndex())) {
- // if the channel is blocked we, we just store the BufferOrEvent
- bufferSpiller.add(next);
- }
- else if (next.isBuffer()) {
- return next;
- }
- else if (next.getEvent().getClass() == CheckpointBarrier.class) {
- if (!endOfStream) {
- // process barriers only if there is a chance of the checkpoint completing
- processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
- }
- }
- else {
- if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
- numClosedChannels++;
- // no chance to complete this checkpoint
- releaseBlocks();
- }
- return next;
- }
- }
- else if (!endOfStream) {
- // end of stream. we feed the data that is still buffered
- endOfStream = true;
- releaseBlocks();
- return getNextNonBlocked();
- }
- else {
- return null;
- }
- }
- }
-
- private void completeBufferedSequence() throws IOException {
- currentBuffered.cleanup();
- currentBuffered = queuedBuffered.pollFirst();
- if (currentBuffered != null) {
- currentBuffered.open();
- }
- }
-
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
- final long barrierId = receivedBarrier.getId();
-
- if (numBarriersReceived > 0) {
- // subsequent barrier of a checkpoint.
- if (barrierId == currentCheckpointId) {
- // regular case
- onBarrier(channelIndex);
- }
- else if (barrierId > currentCheckpointId) {
- // we did not complete the current checkpoint
- LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.", barrierId, currentCheckpointId);
-
- releaseBlocks();
- currentCheckpointId = barrierId;
- onBarrier(channelIndex);
- }
- else {
- // ignore trailing barrier from aborted checkpoint
- return;
- }
-
- }
- else if (barrierId > currentCheckpointId) {
- // first barrier of a new checkpoint
- currentCheckpointId = barrierId;
- onBarrier(channelIndex);
- }
- else {
- // trailing barrier from previous (skipped) checkpoint
- return;
- }
-
- // check if we have all barriers
- if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received all barrier, triggering checkpoint {} at {}",
- receivedBarrier.getId(), receivedBarrier.getTimestamp());
- }
-
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
- }
-
- releaseBlocks();
- }
- }
-
- @Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
- }
- else {
- throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
- }
- }
-
- @Override
- public boolean isEmpty() {
- return currentBuffered == null;
- }
-
- @Override
- public void cleanup() throws IOException {
- bufferSpiller.close();
- if (currentBuffered != null) {
- currentBuffered.cleanup();
- }
- for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
- seq.cleanup();
- }
- }
-
- /**
- * Checks whether the channel with the given index is blocked.
- *
- * @param channelIndex The channel index to check.
- * @return True if the channel is blocked, false if not.
- */
- private boolean isBlocked(int channelIndex) {
- return blockedChannels[channelIndex];
- }
-
- /**
- * Blocks the given channel index, from which a barrier has been received.
- *
- * @param channelIndex The channel index to block.
- */
- private void onBarrier(int channelIndex) throws IOException {
- if (!blockedChannels[channelIndex]) {
- blockedChannels[channelIndex] = true;
- numBarriersReceived++;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from channel " + channelIndex);
- }
- }
- else {
- throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
- }
- }
-
- /**
- * Releases the blocks on all channels. Makes sure the just written data
- * is the next to be consumed.
- */
- private void releaseBlocks() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Releasing blocks");
- }
-
- for (int i = 0; i < blockedChannels.length; i++) {
- blockedChannels[i] = false;
- }
- numBarriersReceived = 0;
-
- if (currentBuffered == null) {
- // common case: no more buffered data
- currentBuffered = bufferSpiller.rollOver();
- if (currentBuffered != null) {
- currentBuffered.open();
- }
- }
- else {
- // uncommon case: buffered data pending
- // push back the pending data, if we have any
-
- // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
- BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();
- if (bufferedNow != null) {
- bufferedNow.open();
- queuedBuffered.addFirst(currentBuffered);
- currentBuffered = bufferedNow;
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // For Testing
- // ------------------------------------------------------------------------
-
- /**
- * Gets the ID defining the current pending, or just completed, checkpoint.
- *
- * @return The ID of the pending of completed checkpoint.
- */
- public long getCurrentCheckpointId() {
- return this.currentCheckpointId;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
- currentCheckpointId, numBarriersReceived, numClosedChannels);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
deleted file mode 100644
index 119fb23..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-
-/**
- * The BarrierTracker keeps track of what checkpoint barriers have been received from
- * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID,
- * it notifies its listener of a completed checkpoint.
- *
- * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
- * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
- *
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
- */
-public class BarrierTracker implements CheckpointBarrierHandler {
-
- /** The tracker tracks a maximum number of checkpoints, for which some, but not all
- * barriers have yet arrived. */
- private static final int MAX_CHECKPOINTS_TO_TRACK = 50;
-
- /** The input gate, to draw the buffers and events from */
- private final InputGate inputGate;
-
- /** The number of channels. Once that many barriers have been received for a checkpoint,
- * the checkpoint is considered complete. */
- private final int totalNumberOfInputChannels;
-
- /** All checkpoints for which some (but not all) barriers have been received,
- * and that are not yet known to be subsumed by newer checkpoints */
- private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
-
- /** The listener to be notified on complete checkpoints */
- private EventListener<CheckpointBarrier> checkpointHandler;
-
- /** The highest checkpoint ID encountered so far */
- private long latestPendingCheckpointID = -1;
-
-
- public BarrierTracker(InputGate inputGate) {
- this.inputGate = inputGate;
- this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.pendingCheckpoints = new ArrayDeque<CheckpointBarrierCount>();
- }
-
- @Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
- while (true) {
- BufferOrEvent next = inputGate.getNextBufferOrEvent();
- if (next == null) {
- return null;
- }
- else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
- return next;
- }
- else {
- processBarrier((CheckpointBarrier) next.getEvent());
- }
- }
- }
-
- @Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
- }
- else {
- throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
- }
- }
-
- @Override
- public void cleanup() {
- pendingCheckpoints.clear();
- }
-
- @Override
- public boolean isEmpty() {
- return pendingCheckpoints.isEmpty();
- }
-
- private void processBarrier(CheckpointBarrier receivedBarrier) {
- // fast path for single channel trackers
- if (totalNumberOfInputChannels == 1) {
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
- }
- return;
- }
-
- // general path for multiple input channels
- final long barrierId = receivedBarrier.getId();
-
- // find the checkpoint barrier in the queue of bending barriers
- CheckpointBarrierCount cbc = null;
- int pos = 0;
-
- for (CheckpointBarrierCount next : pendingCheckpoints) {
- if (next.checkpointId == barrierId) {
- cbc = next;
- break;
- }
- pos++;
- }
-
- if (cbc != null) {
- // add one to the count to that barrier and check for completion
- int numBarriersNew = cbc.incrementBarrierCount();
- if (numBarriersNew == totalNumberOfInputChannels) {
- // checkpoint can be triggered
- // first, remove this checkpoint and all all prior pending
- // checkpoints (which are now subsumed)
- for (int i = 0; i <= pos; i++) {
- pendingCheckpoints.pollFirst();
- }
-
- // notify the listener
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
- }
- }
- }
- else {
- // first barrier for that checkpoint ID
- // add it only if it is newer than the latest checkpoint.
- // if it is not newer than the latest checkpoint ID, then there cannot be a
- // successful checkpoint for that ID anyways
- if (barrierId > latestPendingCheckpointID) {
- latestPendingCheckpointID = barrierId;
- pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
-
- // make sure we do not track too many checkpoints
- if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
- pendingCheckpoints.pollFirst();
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Simple class for a checkpoint ID with a barrier counter.
- */
- private static final class CheckpointBarrierCount {
-
- private final long checkpointId;
-
- private int barrierCount;
-
- private CheckpointBarrierCount(long checkpointId) {
- this.checkpointId = checkpointId;
- this.barrierCount = 1;
- }
-
- public int incrementBarrierCount() {
- return ++barrierCount;
- }
-
- @Override
- public int hashCode() {
- return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof CheckpointBarrierCount) {
- CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
- return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
deleted file mode 100644
index be3c9af..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-
-public class BlockingQueueBroker extends Broker<BlockingQueue<?>> {
-
- /** Singleton instance */
- public static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
- /** Cannot instantiate */
- private BlockingQueueBroker() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
deleted file mode 100644
index cabed14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.util.StringUtils;
-
-/**
- * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file.
- * After a number of elements have been spilled, the spiller can "roll over": It presents the spilled
- * elements as a readable sequence, and opens a new spill file.
- *
- * <p>This implementation buffers data effectively in the OS cache, which gracefully extends to the
- * disk. Most data is written and re-read milliseconds later. The file is deleted after the read.
- * Consequently, in most cases, the data will never actually hit the physical disks.</p>
- *
- * <p>IMPORTANT: The SpilledBufferOrEventSequences created by this spiller all reuse the same
- * reading memory (to reduce overhead) and can consequently not be read concurrently.</p>
- */
-public class BufferSpiller {
-
- /** The counter that selects the next directory to spill into */
- private static final AtomicInteger DIRECTORY_INDEX = new AtomicInteger(0);
-
- /** The size of the buffer with which data is read back in */
- private static final int READ_BUFFER_SIZE = 1024 * 1024;
-
- /** The directories to spill to */
- private final File tempDir;
-
- /** The name prefix for spill files */
- private final String spillFilePrefix;
-
- /** The buffer used for bulk reading data (used in the SpilledBufferOrEventSequence) */
- private final ByteBuffer readBuffer;
-
- /** The buffer that encodes the spilled header */
- private final ByteBuffer headBuffer;
-
- /** The reusable array that holds header and contents buffers */
- private final ByteBuffer[] sources;
-
- /** The file that we currently spill to */
- private File currentSpillFile;
-
- /** The channel of the file we currently spill to */
- private FileChannel currentChannel;
-
- /** The page size, to let this reader instantiate properly sized memory segments */
- private final int pageSize;
-
- /** A counter, to created numbered spill files */
- private int fileCounter;
-
- /** A flag to check whether the spiller has written since the last roll over */
- private boolean hasWritten;
-
- /**
- * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
- *
- * @param ioManager The I/O manager for access to teh temp directories.
- * @param pageSize The page size used to re-create spilled buffers.
- * @throws IOException Thrown if the temp files for spilling cannot be initialized.
- */
- public BufferSpiller(IOManager ioManager, int pageSize) throws IOException {
- this.pageSize = pageSize;
-
- this.readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
- this.readBuffer.order(ByteOrder.LITTLE_ENDIAN);
-
- this.headBuffer = ByteBuffer.allocateDirect(16);
- this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
-
- this.sources = new ByteBuffer[] { this.headBuffer, null };
-
- File[] tempDirs = ioManager.getSpillingDirectories();
- this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length];
-
- byte[] rndBytes = new byte[32];
- new Random().nextBytes(rndBytes);
- this.spillFilePrefix = StringUtils.byteToHexString(rndBytes) + '.';
-
- // prepare for first contents
- createSpillingChannel();
- }
-
- /**
- * Adds a buffer or event to the sequence of spilled buffers and events.
- *
- * @param boe The buffer or event to add and spill.
- * @throws IOException Thrown, if the buffer of event could not be spilled.
- */
- public void add(BufferOrEvent boe) throws IOException {
- hasWritten = true;
- try {
- ByteBuffer contents;
- if (boe.isBuffer()) {
- Buffer buf = boe.getBuffer();
- contents = buf.getMemorySegment().wrap(0, buf.getSize());
- }
- else {
- contents = EventSerializer.toSerializedEvent(boe.getEvent());
- }
-
- headBuffer.clear();
- headBuffer.putInt(boe.getChannelIndex());
- headBuffer.putInt(contents.remaining());
- headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
- headBuffer.flip();
-
- sources[1] = contents;
- currentChannel.write(sources);
- }
- finally {
- if (boe.isBuffer()) {
- boe.getBuffer().recycle();
- }
- }
- }
-
- /**
- * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
- * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
- * last call to this method.
- *
- * <p>NOTE: The SpilledBufferOrEventSequences created by this method all reuse the same
- * reading memory (to reduce overhead) and can consequently not be read concurrently with each other.
- * To create a sequence that can be read concurrently with the previous SpilledBufferOrEventSequence, use the
- * {@link #rollOverWithNewBuffer()} method.</p>
- *
- * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
- * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
- * file could be created.
- */
- public SpilledBufferOrEventSequence rollOver() throws IOException {
- return rollOverInternal(false);
- }
-
- /**
- * Starts a new sequence of spilled buffers and event and returns the current sequence of spilled buffers
- * for reading. This method returns {@code null}, if nothing was added since the creation of the spiller, or the
- * last call to this method.
- *
- * <p>The SpilledBufferOrEventSequence returned by this method is safe for concurrent consumption with
- * any previously returned sequence.</p>
- *
- * @return The readable sequence of spilled buffers and events, or 'null', if nothing was added.
- * @throws IOException Thrown, if the readable sequence could not be created, or no new spill
- * file could be created.
- */
- public SpilledBufferOrEventSequence rollOverWithNewBuffer() throws IOException {
- return rollOverInternal(true);
- }
-
- private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
- if (!hasWritten) {
- return null;
- }
-
- ByteBuffer buf;
- if (newBuffer) {
- buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
- buf.order(ByteOrder.LITTLE_ENDIAN);
- } else {
- buf = readBuffer;
- }
-
- // create a reader for the spilled data
- currentChannel.position(0L);
- SpilledBufferOrEventSequence seq =
- new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
-
- // create ourselves a new spill file
- createSpillingChannel();
-
- hasWritten = false;
- return seq;
- }
-
- /**
- * Cleans up the current spilling channel and file.
- *
- * Does not clean up the SpilledBufferOrEventSequences generated by calls to
- * {@link #rollOver()}.
- *
- * @throws IOException Thrown if channel closing or file deletion fail.
- */
- public void close() throws IOException {
- currentChannel.close();
- if (!currentSpillFile.delete()) {
- throw new IOException("Cannot delete spill file");
- }
- }
-
- // ------------------------------------------------------------------------
- // For testing
- // ------------------------------------------------------------------------
-
- File getCurrentSpillFile() {
- return currentSpillFile;
- }
-
- FileChannel getCurrentChannel() {
- return currentChannel;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("resource")
- private void createSpillingChannel() throws IOException {
- currentSpillFile = new File(tempDir, spillFilePrefix + (fileCounter++) +".buffer");
- currentChannel = new RandomAccessFile(currentSpillFile, "rw").getChannel();
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * This class represents a sequence of spilled buffers and events, created by the
- * {@link BufferSpiller}. The sequence of buffers and events can be read back using the
- * method {@link #getNext()}.
- */
- public static class SpilledBufferOrEventSequence {
-
- /** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
- private static final int HEADER_LENGTH = 9;
-
- /** The file containing the data */
- private final File file;
-
- /** The file channel to draw the data from */
- private final FileChannel fileChannel;
-
- /** The byte buffer for bulk reading */
- private final ByteBuffer buffer;
-
- /** The page size to instantiate properly sized memory segments */
- private final int pageSize;
-
- /** Flag to track whether the sequence has been opened already */
- private boolean opened = false;
-
- /**
- * Create a reader that reads a sequence of spilled buffers and events.
- *
- * @param file The file with the data.
- * @param fileChannel The file channel to read the data from.
- * @param buffer The buffer used for bulk reading.
- * @param pageSize The page size to use for the created memory segments.
- */
- SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
- this.file = file;
- this.fileChannel = fileChannel;
- this.buffer = buffer;
- this.pageSize = pageSize;
- }
-
- /**
- * Initializes the sequence for reading.
- * This method needs to be called before the first call to {@link #getNext()}. Otherwise
- * the results of {@link #getNext()} are not predictable.
- */
- public void open() {
- if (!opened) {
- opened = true;
- buffer.position(0);
- buffer.limit(0);
- }
- }
-
- /**
- * Gets the next BufferOrEvent from the spilled sequence, or {@code null}, if the
- * sequence is exhausted.
- *
- * @return The next BufferOrEvent from the spilled sequence, or {@code null} (end of sequence).
- * @throws IOException Thrown, if the reads failed, of if the byte stream is corrupt.
- */
- public BufferOrEvent getNext() throws IOException {
- if (buffer.remaining() < HEADER_LENGTH) {
- buffer.compact();
-
- while (buffer.position() < HEADER_LENGTH) {
- if (fileChannel.read(buffer) == -1) {
- if (buffer.position() == 0) {
- // no trailing data
- return null;
- } else {
- throw new IOException("Found trailing incomplete buffer or event");
- }
- }
- }
-
- buffer.flip();
- }
-
- final int channel = buffer.getInt();
- final int length = buffer.getInt();
- final boolean isBuffer = buffer.get() == 0;
-
-
- if (isBuffer) {
- // deserialize buffer
- if (length > pageSize) {
- throw new IOException(String.format(
- "Spilled buffer (%d bytes) is larger than page size of (%d bytes)", length, pageSize));
- }
-
- MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(pageSize);
-
- int segPos = 0;
- int bytesRemaining = length;
-
- while (true) {
- int toCopy = Math.min(buffer.remaining(), bytesRemaining);
- if (toCopy > 0) {
- seg.put(segPos, buffer, toCopy);
- segPos += toCopy;
- bytesRemaining -= toCopy;
- }
-
- if (bytesRemaining == 0) {
- break;
- }
- else {
- buffer.clear();
- if (fileChannel.read(buffer) == -1) {
- throw new IOException("Found trailing incomplete buffer");
- }
- buffer.flip();
- }
- }
-
-
- Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
- buf.setSize(length);
-
- return new BufferOrEvent(buf, channel);
- }
- else {
- // deserialize event
- if (length > buffer.capacity() - HEADER_LENGTH) {
- throw new IOException("Event is too large");
- }
-
- if (buffer.remaining() < length) {
- buffer.compact();
-
- while (buffer.position() < length) {
- if (fileChannel.read(buffer) == -1) {
- throw new IOException("Found trailing incomplete event");
- }
- }
-
- buffer.flip();
- }
-
- int oldLimit = buffer.limit();
- buffer.limit(buffer.position() + length);
- AbstractEvent evt = EventSerializer.fromSerializedEvent(buffer, getClass().getClassLoader());
- buffer.limit(oldLimit);
-
- return new BufferOrEvent(evt, channel);
- }
- }
-
- /**
- * Cleans up all file resources held by this spilled sequence.
- *
- * @throws IOException Thrown, if file channel closing or file deletion fail.
- */
- public void cleanup() throws IOException {
- fileChannel.close();
- if (!file.delete()) {
- throw new IOException("Cannot remove temp file for stream alignment writer");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
deleted file mode 100644
index 791fd40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-
-/**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
- * Different implementations may either simply track barriers, or block certain inputs on
- * barriers.
- */
-public interface CheckpointBarrierHandler {
-
- /**
- * Returns the next {@link BufferOrEvent} that the operator may consume.
- * This call blocks until the next BufferOrEvent is available, ir until the stream
- * has been determined to be finished.
- *
- * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
- * @throws java.io.IOException Thrown, if the network or local disk I/O fails.
- * @throws java.lang.InterruptedException Thrown, if the thread is interrupted while blocking during
- * waiting for the next BufferOrEvent to become available.
- */
- BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
-
- /**
- * Registers the given event handler to be notified on successful checkpoints.
- *
- * @param checkpointHandler The handler to register.
- */
- void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
-
- /**
- * Cleans up all internally held resources.
- *
- * @throws IOException Thrown, if the cleanup of I/O resources failed.
- */
- void cleanup() throws IOException;
-
- /**
- * Checks if the barrier handler has buffered any data internally.
- * @return True, if no data is buffered internally, false otherwise.
- */
- boolean isEmpty();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
deleted file mode 100644
index 01e997d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
-
- private OutputSelectorWrapper<OUT> outputSelectorWrapper;
-
- private ArrayList<Output<StreamRecord<OUT>>> allOutputs;
-
- public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
- this.outputSelectorWrapper = outputSelectorWrapper;
- allOutputs = new ArrayList<Output<StreamRecord<OUT>>>();
- }
-
- public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge edge) {
- outputSelectorWrapper.addCollector(output, edge);
- allOutputs.add(output);
- }
-
- @Override
- public void collect(StreamRecord<OUT> record) {
- for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
- output.collect(record);
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- for (Output<?> output : allOutputs) {
- output.emitWatermark(mark);
- }
- }
-
- @Override
- public void close() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
deleted file mode 100644
index f11e9a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-/**
- * Utility for dealing with input gates. This will either just return
- * the single {@link InputGate} that was passed in or create a {@link UnionInputGate} if several
- * {@link InputGate input gates} are given.
- */
-public class InputGateUtil {
-
- public static InputGate createInputGate(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2) {
- List<InputGate> gates = new ArrayList<InputGate>(inputGates1.size() + inputGates2.size());
- gates.addAll(inputGates1);
- gates.addAll(inputGates2);
- return createInputGate(gates.toArray(new InputGate[gates.size()]));
- }
-
- public static InputGate createInputGate(InputGate[] inputGates) {
- if (inputGates.length <= 0) {
- throw new RuntimeException("No such input gate.");
- }
-
- if (inputGates.length < 2) {
- return inputGates[0];
- } else {
- return new UnionInputGate(inputGates);
- }
- }
-
- /**
- * Private constructor to prevent instantiation.
- */
- private InputGateUtil() {
- throw new RuntimeException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
deleted file mode 100644
index 34e5800..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
- */
-public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
-
- private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
-
- private SerializationDelegate<StreamElement> serializationDelegate;
-
-
- @SuppressWarnings("unchecked")
- public RecordWriterOutput(
- StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
- TypeSerializer<OUT> outSerializer,
- boolean enableWatermarkMultiplexing) {
-
- checkNotNull(recordWriter);
-
- // generic hack: cast the writer to generic Object type so we can use it
- // with multiplexed records and watermarks
- this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
- (StreamRecordWriter<?>) recordWriter;
-
- TypeSerializer<StreamElement> outRecordSerializer;
- if (enableWatermarkMultiplexing) {
- outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
- } else {
- outRecordSerializer = (TypeSerializer<StreamElement>)
- (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
- }
-
- if (outSerializer != null) {
- serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
- }
- }
-
- @Override
- public void collect(StreamRecord<OUT> record) {
- serializationDelegate.setInstance(record);
-
- try {
- recordWriter.emit(serializationDelegate);
- }
- catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- serializationDelegate.setInstance(mark);
-
- try {
- recordWriter.broadcastEmit(serializationDelegate);
- }
- catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
- recordWriter.broadcastEvent(barrier);
- }
-
-
- public void flush() throws IOException {
- recordWriter.flush();
- }
-
- @Override
- public void close() {
- recordWriter.close();
- }
-
- public void clearBuffers() {
- recordWriter.clearBuffers();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
deleted file mode 100644
index e131cda..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-/**
- * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
- *
- * <p>
- * This also keeps track of {@link Watermark} events and forwards them to event subscribers
- * once the {@link Watermark} from all inputs advances.
- *
- * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
- * object. This ensures that we don't call methods on a {@link OneInputStreamOperator} concurrently
- * with the timer callback or other things.
- *
- * @param <IN> The type of the record that can be read with this record reader.
- */
-public class StreamInputProcessor<IN> {
-
- private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
-
- private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
-
- private final CheckpointBarrierHandler barrierHandler;
-
- // We need to keep track of the channel from which a buffer came, so that we can
- // appropriately map the watermarks to input channels
- private int currentChannel = -1;
-
- private boolean isFinished;
-
-
-
- private final long[] watermarks;
- private long lastEmittedWatermark;
-
- private final DeserializationDelegate<StreamElement> deserializationDelegate;
-
- @SuppressWarnings("unchecked")
- public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
- EventListener<CheckpointBarrier> checkpointListener,
- CheckpointingMode checkpointMode,
- IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
-
- InputGate inputGate = InputGateUtil.createInputGate(inputGates);
-
- if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
- }
- else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
- this.barrierHandler = new BarrierTracker(inputGate);
- }
- else {
- throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
- }
-
- if (checkpointListener != null) {
- this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
- }
-
- if (enableWatermarkMultiplexing) {
- MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
- this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(ser);
- } else {
- StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
- this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
- (NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
- }
-
- // Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
-
- for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
- }
-
- watermarks = new long[inputGate.getNumberOfInputChannels()];
- for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
- watermarks[i] = Long.MIN_VALUE;
- }
- lastEmittedWatermark = Long.MIN_VALUE;
- }
-
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
- if (isFinished) {
- return false;
- }
-
- while (true) {
- if (currentRecordDeserializer != null) {
- DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
-
- if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
- currentRecordDeserializer = null;
- }
-
- if (result.isFullRecord()) {
- StreamElement recordOrWatermark = deserializationDelegate.getInstance();
-
- if (recordOrWatermark.isWatermark()) {
- long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
- if (watermarkMillis > watermarks[currentChannel]) {
- watermarks[currentChannel] = watermarkMillis;
- long newMinWatermark = Long.MAX_VALUE;
- for (long watermark : watermarks) {
- newMinWatermark = Math.min(watermark, newMinWatermark);
- }
- if (newMinWatermark > lastEmittedWatermark) {
- lastEmittedWatermark = newMinWatermark;
- synchronized (lock) {
- streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
- }
- }
- }
- continue;
- } else {
- // now we can do the actual processing
- StreamRecord<IN> record = recordOrWatermark.asRecord();
- synchronized (lock) {
- streamOperator.setKeyContextElement(record);
- streamOperator.processElement(record);
- }
- return true;
- }
- }
- }
-
- final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
- if (bufferOrEvent != null) {
- if (bufferOrEvent.isBuffer()) {
- currentChannel = bufferOrEvent.getChannelIndex();
- currentRecordDeserializer = recordDeserializers[currentChannel];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
- }
- else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
- if (event.getClass() != EndOfPartitionEvent.class) {
- throw new IOException("Unexpected event: " + event);
- }
- }
- }
- else {
- isFinished = true;
- if (!barrierHandler.isEmpty()) {
- throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
- }
- return false;
- }
- }
- }
-
- public void setReporter(AccumulatorRegistry.Reporter reporter) {
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- deserializer.setReporter(reporter);
- }
- }
-
- public void cleanup() throws IOException {
- // clear the buffers first. this part should not ever fail
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
-
- // cleanup the barrier handler resources
- barrierHandler.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
deleted file mode 100644
index 8dcaad8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * This record writer keeps data in buffers at most for a certain timeout. It spawns a separate thread
- * that flushes the outputs in a defined interval, to make sure data does not linger in the buffers for too long.
- *
- * @param <T> The type of elements written.
- */
-public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
-
- /** Default name for teh output flush thread, if no name with a task reference is given */
- private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
-
-
- /** The thread that periodically flushes the output, to give an upper latency bound */
- private final OutputFlusher outputFlusher;
-
- /** Flag indicating whether the output should be flushed after every element */
- private final boolean flushAlways;
-
- /** The exception encountered in the flushing thread */
- private Throwable flusherException;
-
-
-
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
- this(writer, channelSelector, timeout, null);
- }
-
- public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
- long timeout, String taskName) {
-
- super(writer, channelSelector);
-
- checkArgument(timeout >= -1);
-
- if (timeout == -1) {
- flushAlways = false;
- outputFlusher = null;
- }
- else if (timeout == 0) {
- flushAlways = true;
- outputFlusher = null;
- }
- else {
- flushAlways = false;
- String threadName = taskName == null ?
- DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
-
- outputFlusher = new OutputFlusher(threadName, timeout);
- outputFlusher.start();
- }
- }
-
- @Override
- public void emit(T record) throws IOException, InterruptedException {
- checkErroneous();
- super.emit(record);
- if (flushAlways) {
- flush();
- }
- }
-
- @Override
- public void broadcastEmit(T record) throws IOException, InterruptedException {
- checkErroneous();
- super.broadcastEmit(record);
- if (flushAlways) {
- flush();
- }
- }
-
- /**
- * Closes the writer. This stops the flushing thread (if there is one).
- */
- public void close() {
- // make sure we terminate the thread in any case
- if (outputFlusher != null) {
- outputFlusher.terminate();
- try {
- outputFlusher.join();
- }
- catch (InterruptedException e) {
- // ignore on close
- }
- }
- }
-
- /**
- * Notifies the writer that the output flusher thread encountered an exception.
- *
- * @param t The exception to report.
- */
- private void notifyFlusherException(Throwable t) {
- if (this.flusherException == null) {
- this.flusherException = t;
- }
- }
-
- private void checkErroneous() throws IOException {
- if (flusherException != null) {
- throw new IOException("An exception happened while flushing the outputs", flusherException);
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
- *
- * The thread is daemonic, because it is only a utility thread.
- */
- private class OutputFlusher extends Thread {
-
- private final long timeout;
-
- private volatile boolean running = true;
-
-
- OutputFlusher(String name, long timeout) {
- super(name);
- setDaemon(true);
- this.timeout = timeout;
- }
-
- public void terminate() {
- running = false;
- interrupt();
- }
-
- @Override
- public void run() {
- try {
- while (running) {
- try {
- Thread.sleep(timeout);
- }
- catch (InterruptedException e) {
- // propagate this if we are still running, because it should not happen
- // in that case
- if (running) {
- throw new Exception(e);
- }
- }
-
- // any errors here should let the thread come to a halt and be
- // recognized by the writer
- flush();
- }
- }
- catch (Throwable t) {
- notifyFlusherException(t);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
deleted file mode 100644
index 882037e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-/**
- * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
- *
- * <p>
- * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
- * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
- *
- * <p>
- * Forwarding elements or watermarks must be protected by synchronizing on the given lock
- * object. This ensures that we don't call methods on a {@link TwoInputStreamOperator} concurrently
- * with the timer callback or other things.
- *
- * @param <IN1> The type of the records that arrive on the first input
- * @param <IN2> The type of the records that arrive on the second input
- */
-public class StreamTwoInputProcessor<IN1, IN2> {
-
- private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
-
- private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
-
- // We need to keep track of the channel from which a buffer came, so that we can
- // appropriately map the watermarks to input channels
- private int currentChannel = -1;
-
- private boolean isFinished;
-
- private final CheckpointBarrierHandler barrierHandler;
-
- private final long[] watermarks1;
- private long lastEmittedWatermark1;
-
- private final long[] watermarks2;
- private long lastEmittedWatermark2;
-
- private final int numInputChannels1;
-
- private final DeserializationDelegate<StreamElement> deserializationDelegate1;
- private final DeserializationDelegate<StreamElement> deserializationDelegate2;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- public StreamTwoInputProcessor(
- Collection<InputGate> inputGates1,
- Collection<InputGate> inputGates2,
- TypeSerializer<IN1> inputSerializer1,
- TypeSerializer<IN2> inputSerializer2,
- EventListener<CheckpointBarrier> checkpointListener,
- CheckpointingMode checkpointMode,
- IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
-
- final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
-
- if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
- }
- else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
- this.barrierHandler = new BarrierTracker(inputGate);
- }
- else {
- throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
- }
-
- if (checkpointListener != null) {
- this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
- }
-
- if (enableWatermarkMultiplexing) {
- MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
- this.deserializationDelegate1 = new NonReusingDeserializationDelegate<StreamElement>(ser);
- }
- else {
- StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
- this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
- (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
- }
-
- if (enableWatermarkMultiplexing) {
- MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
- this.deserializationDelegate2 = new NonReusingDeserializationDelegate<StreamElement>(ser);
- }
- else {
- StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
- this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
- (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
- }
-
- // Initialize one deserializer per input channel
- this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
-
- for (int i = 0; i < recordDeserializers.length; i++) {
- recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
- }
-
- // determine which unioned channels belong to input 1 and which belong to input 2
- int numInputChannels1 = 0;
- for (InputGate gate: inputGates1) {
- numInputChannels1 += gate.getNumberOfInputChannels();
- }
-
- this.numInputChannels1 = numInputChannels1;
- int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
-
- watermarks1 = new long[numInputChannels1];
- Arrays.fill(watermarks1, Long.MIN_VALUE);
- lastEmittedWatermark1 = Long.MIN_VALUE;
-
- watermarks2 = new long[numInputChannels2];
- Arrays.fill(watermarks2, Long.MIN_VALUE);
- lastEmittedWatermark2 = Long.MIN_VALUE;
- }
-
- @SuppressWarnings("unchecked")
- public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Object lock) throws Exception {
- if (isFinished) {
- return false;
- }
-
- while (true) {
- if (currentRecordDeserializer != null) {
- DeserializationResult result;
- if (currentChannel < numInputChannels1) {
- result = currentRecordDeserializer.getNextRecord(deserializationDelegate1);
- } else {
- result = currentRecordDeserializer.getNextRecord(deserializationDelegate2);
- }
-
- if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
- currentRecordDeserializer = null;
- }
-
- if (result.isFullRecord()) {
- if (currentChannel < numInputChannels1) {
- StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
- if (recordOrWatermark.isWatermark()) {
- handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, lock);
- continue;
- }
- else {
- synchronized (lock) {
- streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
- }
- return true;
-
- }
- }
- else {
- StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
- if (recordOrWatermark.isWatermark()) {
- handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
- continue;
- }
- else {
- synchronized (lock) {
- streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
- }
- return true;
- }
- }
- }
- }
-
- final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
- if (bufferOrEvent != null) {
-
- if (bufferOrEvent.isBuffer()) {
- currentChannel = bufferOrEvent.getChannelIndex();
- currentRecordDeserializer = recordDeserializers[currentChannel];
- currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-
- } else {
- // Event received
- final AbstractEvent event = bufferOrEvent.getEvent();
- if (event.getClass() != EndOfPartitionEvent.class) {
- throw new IOException("Unexpected event: " + event);
- }
- }
- }
- else {
- isFinished = true;
- if (!barrierHandler.isEmpty()) {
- throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
- }
- return false;
- }
- }
- }
-
- private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex, Object lock) throws Exception {
- if (channelIndex < numInputChannels1) {
- long watermarkMillis = mark.getTimestamp();
- if (watermarkMillis > watermarks1[channelIndex]) {
- watermarks1[channelIndex] = watermarkMillis;
- long newMinWatermark = Long.MAX_VALUE;
- for (long wm : watermarks1) {
- newMinWatermark = Math.min(wm, newMinWatermark);
- }
- if (newMinWatermark > lastEmittedWatermark1) {
- lastEmittedWatermark1 = newMinWatermark;
- synchronized (lock) {
- operator.processWatermark1(new Watermark(lastEmittedWatermark1));
- }
- }
- }
- } else {
- channelIndex = channelIndex - numInputChannels1;
- long watermarkMillis = mark.getTimestamp();
- if (watermarkMillis > watermarks2[channelIndex]) {
- watermarks2[channelIndex] = watermarkMillis;
- long newMinWatermark = Long.MAX_VALUE;
- for (long wm : watermarks2) {
- newMinWatermark = Math.min(wm, newMinWatermark);
- }
- if (newMinWatermark > lastEmittedWatermark2) {
- lastEmittedWatermark2 = newMinWatermark;
- synchronized (lock) {
- operator.processWatermark2(new Watermark(lastEmittedWatermark2));
- }
- }
- }
- }
- }
-
- public void setReporter(AccumulatorRegistry.Reporter reporter) {
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- deserializer.setReporter(reporter);
- }
- }
-
- public void cleanup() throws IOException {
- // clear the buffers first. this part should not ever fail
- for (RecordDeserializer<?> deserializer : recordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
-
- // cleanup the barrier handler resources
- barrierHandler.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
deleted file mode 100644
index 793e87e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-
-import java.io.IOException;
-
-public interface StreamingReader extends ReaderBase {
-
- void cleanup() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
deleted file mode 100644
index 7020758..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * An operator that can sort a stream based on timestamps. Arriving elements will be put into
- * buckets based on their timestamp. Sorting and emission of sorted elements happens once
- * the watermark passes the end of a bucket.
- *
- * @param <T> The type of the elements on which this operator works.
- */
-public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
- private static final long serialVersionUID = 1L;
-
- private long granularity;
-
- private transient Map<Long, List<StreamRecord<T>>> buckets;
-
- /**
- * Creates a new sorting operator that creates buckets with the given interval.
- *
- * @param interval The size (in time) of one bucket.
- */
- public BucketStreamSortOperator(long interval) {
- this.granularity = interval;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- buckets = new HashMap<>();
-
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void processElement(StreamRecord<T> record) throws Exception {
- long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity);
- List<StreamRecord<T>> bucket = buckets.get(bucketId);
- if (bucket == null) {
- bucket = new ArrayList<>();
- buckets.put(bucketId, bucket);
- }
- bucket.add(record);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity);
- Set<Long> toRemove = new HashSet<>();
- for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) {
- if (bucket.getKey() < maxBucketId) {
- Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() {
- @Override
- public int compare(StreamRecord<T> o1, StreamRecord<T> o2) {
- return (int) (o1.getTimestamp() - o2.getTimestamp());
- }
- });
- for (StreamRecord<T> r: bucket.getValue()) {
- output.collect(r);
- }
- toRemove.add(bucket.getKey());
- }
- }
-
- for (Long l: toRemove) {
- buckets.remove(l);
- }
-
- output.emitWatermark(mark);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
deleted file mode 100644
index 6e51a49..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps
- * from user elements and assigning them as the internal timestamp of the {@link StreamRecord}.
- *
- * @param <T> The type of the input elements
- */
-public class ExtractTimestampsOperator<T>
- extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
- implements OneInputStreamOperator<T, T>, Triggerable {
-
- private static final long serialVersionUID = 1L;
-
- transient long watermarkInterval;
-
- transient long currentWatermark;
-
- public ExtractTimestampsOperator(TimestampExtractor<T> extractor) {
- super(extractor);
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
- if (watermarkInterval > 0) {
- registerTimer(System.currentTimeMillis() + watermarkInterval, this);
- }
-
- currentWatermark = Long.MIN_VALUE;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
-
- // emit a final +Inf watermark, just like the sources
- output.emitWatermark(new Watermark(Long.MAX_VALUE));
- }
-
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
- output.collect(element.replace(element.getValue(), newTimestamp));
- long watermark = userFunction.extractWatermark(element.getValue(), newTimestamp);
- if (watermark > currentWatermark) {
- currentWatermark = watermark;
- output.emitWatermark(new Watermark(currentWatermark));
- }
- }
-
- @Override
- public void trigger(long timestamp) throws Exception {
- // register next timer
- registerTimer(System.currentTimeMillis() + watermarkInterval, this);
- long lastWatermark = currentWatermark;
- currentWatermark = userFunction.getCurrentWatermark();
-
- if (currentWatermark > lastWatermark) {
- // emit watermark
- output.emitWatermark(new Watermark(currentWatermark));
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // ignore them, since we are basically a watermark source
- }
-}