You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:40 UTC
[4/7] flink git commit: [FLINK-4391] Polish asynchronous I/O
operations
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
new file mode 100644
index 0000000..5a2e43c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * Interface for {@link StreamOperator} actions.
+ */
+public interface OperatorActions {
+
+ /**
+ * Fail the respective stream operator with the given throwable.
+ *
+ * @param throwable to fail the stream operator with
+ */
+ void failOperator(Throwable throwable);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
new file mode 100644
index 0000000..8088bf0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import java.util.Collection;
+
+/**
+ * {@link AsyncResult} sub class for asynchronous result collections.
+ *
+ * @param <T> Type of the collection elements.
+ */
+public interface AsyncCollectionResult<T> extends AsyncResult {
+
+ boolean hasTimestamp();
+
+ long getTimestamp();
+
+ /**
+ * Return the asynchronous result collection.
+ *
+ * @return the asynchronous result collection
+ * @throws Exception if the asynchronous result collection could not be completed
+ */
+ Collection<T> get() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
new file mode 100644
index 0000000..1a99928
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * Asynchronous result returned by the {@link StreamElementQueue}. The asynchronous result can
+ * either be a {@link Watermark} or a collection of new output elements produced by the
+ * {@link AsyncFunction}.
+ */
+public interface AsyncResult {
+
+ /**
+ * True if the async result is a {@link Watermark}; otherwise false.
+ *
+ * @return True if the async result is a {@link Watermark}; otherwise false.
+ */
+ boolean isWatermark();
+
+ /**
+ * True fi the async result is a collection of output elements; otherwise false.
+ *
+ * @return True if the async reuslt is a collection of output elements; otherwise false
+ */
+ boolean isResultCollection();
+
+ /**
+ * Return this async result as a async watermark result.
+ *
+ * @return this result as a {@link AsyncWatermarkResult}.
+ */
+ AsyncWatermarkResult asWatermark();
+
+ /**
+ * Return this async result as a async result collection.
+ *
+ * @param <T> Type of the result collection's elements
+ * @return this result as a {@link AsyncCollectionResult}.
+ */
+ <T> AsyncCollectionResult<T> asResultCollection();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
new file mode 100644
index 0000000..c19b520
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * {@link AsyncResult} subclass for asynchronous result {@link Watermark}.
+ */
+public interface AsyncWatermarkResult extends AsyncResult {
+ /**
+ * Get the resulting watermark.
+ *
+ * @return the asynchronous result watermark
+ */
+ Watermark getWatermark();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
new file mode 100644
index 0000000..2bbcb6c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue emits
+ * asynchronous results in the order in which the {@link StreamElementQueueEntry} have been added
+ * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly
+ * follows the insertion order (element cannot overtake each other).
+ */
+public class OrderedStreamElementQueue implements StreamElementQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
+
+ /** Capacity of this queue */
+ private final int capacity;
+
+ /** Executor to run the onCompletion callback */
+ private final Executor executor;
+
+ /** Operator actions to signal a failure to the operator */
+ private final OperatorActions operatorActions;
+
+ /** Lock and conditions for the blocking queue */
+ private final ReentrantLock lock;
+ private final Condition notFull;
+ private final Condition headIsCompleted;
+
+ /** Queue for the inserted StreamElementQueueEntries */
+ private final ArrayDeque<StreamElementQueueEntry<?>> queue;
+
+ public OrderedStreamElementQueue(
+ int capacity,
+ Executor executor,
+ OperatorActions operatorActions) {
+
+ Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
+ this.capacity = capacity;
+
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+
+ this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+ this.lock = new ReentrantLock(false);
+ this.headIsCompleted = lock.newCondition();
+ this.notFull = lock.newCondition();
+
+ this.queue = new ArrayDeque<>(capacity);
+ }
+
+ @Override
+ public AsyncResult peekBlockingly() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (queue.isEmpty() || !queue.peek().isDone()) {
+ headIsCompleted.await();
+ }
+
+ LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+ "({}/{}).", queue.size(), capacity);
+
+ return queue.peek();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public AsyncResult poll() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (queue.isEmpty() || !queue.peek().isDone()) {
+ headIsCompleted.await();
+ }
+
+ notFull.signalAll();
+
+ LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
+ "({}/{}).", queue.size() - 1, capacity);
+
+ return queue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];
+
+ array = queue.toArray(array);
+
+ return Arrays.asList(array);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (queue.size() >= capacity) {
+ notFull.await();
+ }
+
+ addEntry(streamElementQueueEntry);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (queue.size() < capacity) {
+ addEntry(streamElementQueueEntry);
+
+ LOG.debug("Put element into ordered stream element queue. New filling degree " +
+ "({}/{}).", queue.size(), capacity);
+
+ return true;
+ } else {
+ LOG.debug("Failed to put element into ordered stream element queue because it " +
+ "was full ({}/{}).", queue.size(), capacity);
+
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method
+ * registers a onComplete callback which is triggered once the given queue entry is completed.
+ *
+ * @param streamElementQueueEntry to be inserted
+ * @param <T> Type of the stream element queue entry's result
+ */
+ private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
+ assert(lock.isHeldByCurrentThread());
+
+ queue.addLast(streamElementQueueEntry);
+
+ streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
+ @Override
+ public void accept(StreamElementQueueEntry<T> value) {
+ try {
+ onCompleteHandler(value);
+ } catch (InterruptedException e) {
+ // we got interrupted. This indicates a shutdown of the executor
+ LOG.debug("AsyncBufferEntry could not be properly completed because the " +
+ "executor thread has been interrupted.", e);
+ } catch (Throwable t) {
+ operatorActions.failOperator(new Exception("Could not complete the " +
+ "stream element queue entry: " + value + '.', t));
+ }
+ }
+ }, executor);
+ }
+
+ /**
+ * Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the
+ * case, then notify the consumer thread about a new consumable entry.
+ *
+ * @param streamElementQueueEntry which has been completed
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (!queue.isEmpty() && queue.peek().isDone()) {
+ LOG.debug("Signal ordered stream element queue has completed head element.");
+ headIsCompleted.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
new file mode 100644
index 0000000..1a2c4a8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+
+import java.util.Collection;
+
+/**
+ * Interface for blocking stream element queues for the {@link AsyncWaitOperator}.
+ */
+public interface StreamElementQueue {
+
+ /**
+ * Put the given element in the queue if capacity is left. If not, then block until this is
+ * the case.
+ *
+ * @param streamElementQueueEntry to be put into the queue
+ * @param <T> Type of the entries future value
+ * @throws InterruptedException if the calling thread has been interrupted while waiting to
+ * insert the given element
+ */
+ <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
+
+ /**
+ * Try to put the given element in the queue. This operation succeeds if the queue has capacity
+ * left and fails if the queue is full.
+ *
+ * @param streamElementQueueEntry to be inserted
+ * @param <T> Type of the entries future value
+ * @return True if the entry could be inserted; otherwise false
+ * @throws InterruptedException if the calling thread has been interrupted while waiting to
+ * insert the given element
+ */
+ <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
+
+ /**
+ * Peek at the head of the queue and return the first completed {@link AsyncResult}. This
+ * operation is a blocking operation and only returns once a completed async result has been
+ * found.
+ *
+ * @return Completed {@link AsyncResult}
+ * @throws InterruptedException if the current thread has been interrupted while waiting for a
+ * completed async result.
+ */
+ AsyncResult peekBlockingly() throws InterruptedException;
+
+ /**
+ * Poll the first completed {@link AsyncResult} from the head of this queue. This operation is
+ * blocking and only returns once a completed async result has been found.
+ *
+ * @return Completed {@link AsyncResult} which has been removed from the queue
+ * @throws InterruptedException if the current thread has been interrupted while waiting for a
+ * completed async result.
+ */
+ AsyncResult poll() throws InterruptedException;
+
+ /**
+ * Return the collection of {@link StreamElementQueueEntry} currently contained in this queue.
+ *
+ * @return Collection of currently contained {@link StreamElementQueueEntry}.
+ * @throws InterruptedException if the current thread has been interrupted while retrieving the
+ * stream element queue entries of this queue.
+ */
+ Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;
+
+ /**
+ * True if the queue is empty; otherwise false.
+ *
+ * @return True if the queue is empty; otherwise false.
+ */
+ boolean isEmpty();
+
+ /**
+ * Return the size of the queue.
+ *
+ * @return The number of elements contained in this queue.
+ */
+ int size();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
new file mode 100644
index 0000000..06ebf3c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the
+ * {@link StreamElement} for which the stream element queue entry has been instantiated.
+ * Furthermore, it allows to register callbacks for when the queue entry is completed.
+ *
+ * @param <T> Type of the result
+ */
+public abstract class StreamElementQueueEntry<T> implements AsyncResult {
+
+ /** Stream element */
+ private final StreamElement streamElement;
+
+ public StreamElementQueueEntry(StreamElement streamElement) {
+ this.streamElement = Preconditions.checkNotNull(streamElement);
+ }
+
+ public StreamElement getStreamElement() {
+ return streamElement;
+ }
+
+ /**
+ * True if the stream element queue entry has been completed; otherwise false.
+ *
+ * @return True if the stream element queue entry has been completed; otherwise false.
+ */
+ public boolean isDone() {
+ return getFuture().isDone();
+ }
+
+ /**
+ * Register the given complete function to be called once this queue entry has been completed.
+ *
+ * @param completeFunction to call when the queue entry has been completed
+ * @param executor to run the complete function
+ */
+ public void onComplete(
+ final AcceptFunction<StreamElementQueueEntry<T>> completeFunction,
+ Executor executor) {
+ final StreamElementQueueEntry<T> thisReference = this;
+
+ getFuture().thenAcceptAsync(new AcceptFunction<T>() {
+ @Override
+ public void accept(T value) {
+ completeFunction.accept(thisReference);
+ }
+ }, executor);
+ }
+
+ protected abstract Future<T> getFuture();
+
+ @Override
+ public final boolean isWatermark() {
+ return AsyncWatermarkResult.class.isAssignableFrom(getClass());
+ }
+
+ @Override
+ public final boolean isResultCollection() {
+ return AsyncCollectionResult.class.isAssignableFrom(getClass());
+ }
+
+ @Override
+ public final AsyncWatermarkResult asWatermark() {
+ return (AsyncWatermarkResult) this;
+ }
+
+ @Override
+ public final <T> AsyncCollectionResult<T> asResultCollection() {
+ return (AsyncCollectionResult<T>) this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
new file mode 100644
index 0000000..f0e707e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
+ * as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The
+ * async function completes this class with a collection of results.
+ *
+ * @param <OUT> Type of the asynchronous collection result
+ */
+public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
+ implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
+
+ /** Timestamp information */
+ private final boolean hasTimestamp;
+ private final long timestamp;
+
+ /** Future containing the collection result */
+ private final CompletableFuture<Collection<OUT>> resultFuture;
+
+ public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
+ super(streamRecord);
+
+ hasTimestamp = streamRecord.hasTimestamp();
+ timestamp = streamRecord.getTimestamp();
+
+ resultFuture = new FlinkCompletableFuture<>();
+ }
+
+ @Override
+ public boolean hasTimestamp() {
+ return hasTimestamp;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public Collection<OUT> get() throws Exception {
+ return resultFuture.get();
+ }
+
+ @Override
+ protected Future<Collection<OUT>> getFuture() {
+ return resultFuture;
+ }
+
+ @Override
+ public void collect(Collection<OUT> result) {
+ resultFuture.complete(result);
+ }
+
+ @Override
+ public void collect(Throwable error) {
+ resultFuture.completeExceptionally(error);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
new file mode 100644
index 0000000..603d8cc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue
+ * emits asynchronous results as soon as they are completed. Additionally it maintains the
+ * watermark-stream record order. This means that no stream record can be overtaken by a watermark
+ * and no watermark can overtake a stream record. However, stream records falling in the same
+ * segment between two watermarks can overtake each other (their emission order is not guaranteed).
+ */
+public class UnorderedStreamElementQueue implements StreamElementQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
+
+ /** Capacity of this queue */
+ private final int capacity;
+
+ /** Executor to run the onComplete callbacks */
+ private final Executor executor;
+
+ /** OperatorActions to signal the owning operator a failure */
+ private final OperatorActions operatorActions;
+
+ /** Queue of uncompleted stream element queue entries segmented by watermarks */
+ private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
+
+ /** Queue of completed stream element queue entries */
+ private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
+
+ /** First (chronologically oldest) uncompleted set of stream element queue entries */
+ private Set<StreamElementQueueEntry<?>> firstSet;
+
+ // Last (chronologically youngest) uncompleted set of stream element queue entries. New
+ // stream element queue entries are inserted into this set.
+ private Set<StreamElementQueueEntry<?>> lastSet;
+ private volatile int numberEntries;
+
+ /** Locks and conditions for the blocking queue */
+ private final ReentrantLock lock;
+ private final Condition notFull;
+ private final Condition hasCompletedEntries;
+
+ public UnorderedStreamElementQueue(
+ int capacity,
+ Executor executor,
+ OperatorActions operatorActions) {
+
+ Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
+ this.capacity = capacity;
+
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+
+ this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+ this.uncompletedQueue = new ArrayDeque<>(capacity);
+ this.completedQueue = new ArrayDeque<>(capacity);
+
+ this.firstSet = new HashSet<>(capacity);
+ this.lastSet = firstSet;
+
+ this.numberEntries = 0;
+
+ this.lock = new ReentrantLock();
+ this.notFull = lock.newCondition();
+ this.hasCompletedEntries = lock.newCondition();
+ }
+
+ @Override
+ public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (numberEntries >= capacity) {
+ notFull.await();
+ }
+
+ addEntry(streamElementQueueEntry);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (numberEntries < capacity) {
+ addEntry(streamElementQueueEntry);
+
+ LOG.debug("Put element into ordered stream element queue. New filling degree " +
+ "({}/{}).", numberEntries, capacity);
+
+ return true;
+ } else {
+ LOG.debug("Failed to put element into ordered stream element queue because it " +
+ "was full ({}/{}).", numberEntries, capacity);
+
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public AsyncResult peekBlockingly() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (completedQueue.isEmpty()) {
+ hasCompletedEntries.await();
+ }
+
+ LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+ "({}/{}).", numberEntries, capacity);
+
+ return completedQueue.peek();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public AsyncResult poll() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ while (completedQueue.isEmpty()) {
+ hasCompletedEntries.await();
+ }
+
+ numberEntries--;
+ notFull.signalAll();
+
+ LOG.debug("Polled element from unordered stream element queue. New filling degree " +
+ "({}/{}).", numberEntries, capacity);
+
+ return completedQueue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];
+
+ array = completedQueue.toArray(array);
+
+ int counter = completedQueue.size();
+
+ for (StreamElementQueueEntry<?> entry: firstSet) {
+ array[counter] = entry;
+ counter++;
+ }
+
+ for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {
+
+ for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
+ array[counter] = streamElementQueueEntry;
+ counter++;
+ }
+ }
+
+ return Arrays.asList(array);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return numberEntries == 0;
+ }
+
+ @Override
+ public int size() {
+ return numberEntries;
+ }
+
+ /**
+ * Callback for onComplete events for the given stream element queue entry. Whenever a queue
+ * entry is completed, it is checked whether this entry belogns to the first set. If this is the
+ * case, then the element is added to the completed entries queue from where it can be consumed.
+ * If the first set becomes empty, then the next set is polled from the uncompleted entries
+ * queue. Completed entries from this new set are then added to the completed entries queue.
+ *
+ * @param streamElementQueueEntry which has been completed
+ * @throws InterruptedException if the current thread has been interrupted while performing the
+ * on complete callback.
+ */
+ public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
+ lock.lockInterruptibly();
+
+ try {
+ if (firstSet.remove(streamElementQueueEntry)) {
+ completedQueue.offer(streamElementQueueEntry);
+
+ while (firstSet.isEmpty() && firstSet != lastSet) {
+ firstSet = uncompletedQueue.poll();
+
+ Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
+
+ while (it.hasNext()) {
+ StreamElementQueueEntry<?> bufferEntry = it.next();
+
+ if (bufferEntry.isDone()) {
+ completedQueue.offer(bufferEntry);
+ it.remove();
+ }
+ }
+ }
+
+ LOG.debug("Signal unordered stream element queue has completed entries.");
+ hasCompletedEntries.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Add the given stream element queue entry to the current last set if it is not a watermark.
+ * If it is a watermark, then stop adding to the current last set, insert the watermark into its
+ * own set and add a new last set.
+ *
+ * @param streamElementQueueEntry to be inserted
+ * @param <T> Type of the stream element queue entry's result
+ */
+ private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
+ assert(lock.isHeldByCurrentThread());
+
+ if (streamElementQueueEntry.isWatermark()) {
+ lastSet = new HashSet<>(capacity);
+
+ if (firstSet.isEmpty()) {
+ firstSet.add(streamElementQueueEntry);
+ } else {
+ Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
+ watermarkSet.add(streamElementQueueEntry);
+ uncompletedQueue.offer(watermarkSet);
+ }
+ uncompletedQueue.offer(lastSet);
+ } else {
+ lastSet.add(streamElementQueueEntry);
+ }
+
+ streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
+ @Override
+ public void accept(StreamElementQueueEntry<T> value) {
+ try {
+ onCompleteHandler(value);
+ } catch (InterruptedException e) {
+ // The accept executor thread got interrupted. This is probably cause by
+ // the shutdown of the executor.
+ LOG.debug("AsyncBufferEntry could not be properly completed because the " +
+ "executor thread has been interrupted.", e);
+ } catch (Throwable t) {
+ operatorActions.failOperator(new Exception("Could not complete the " +
+ "stream element queue entry: " + value + '.', t));
+ }
+ }
+ }, executor);
+
+ numberEntries++;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
new file mode 100644
index 0000000..6fe4f44
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
+ */
+public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
+
+ private final Future<Watermark> future;
+
+ public WatermarkQueueEntry(Watermark watermark) {
+ super(watermark);
+
+ this.future = FlinkCompletableFuture.completed(watermark);
+ }
+
+ @Override
+ public Watermark getWatermark() {
+ return (Watermark) getStreamElement();
+ }
+
+ @Override
+ protected Future<Watermark> getFuture() {
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 680cc29..7771064 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -52,7 +52,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-
/**
* The {@code OperatorChain} contains all operators that are executed as one chain within a single
* {@link StreamTask}.
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 0fb22b8..bd9215a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -326,11 +326,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
LOG.error("Could not shut down async checkpoint threads", t);
}
- // release the output resources. this method should never fail.
- if (operatorChain != null) {
- operatorChain.releaseOutputs();
- }
-
// we must! perform this cleanup
try {
cleanup();
@@ -344,6 +339,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (!disposed) {
disposeAllOperators();
}
+
+ // release the output resources. this method should never fail.
+ if (operatorChain != null) {
+ operatorChain.releaseOutputs();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index b8788c6..12ac693 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -18,147 +18,252 @@
package org.apache.flink.streaming.api.functions.async;
-import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.junit.Assert;
import org.junit.Test;
-import static org.mockito.Matchers.anyString;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Test case for {@link RichAsyncFunction}
+ * Test cases for {@link RichAsyncFunction}
*/
public class RichAsyncFunctionTest {
- private RichAsyncFunction<String, String> initFunction() {
- RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+ /**
+ * Test the set of iteration runtime context methods in the context of a
+ * {@link RichAsyncFunction}.
+ */
+ @Test
+ public void testIterationRuntimeContext() throws Exception {
+ RichAsyncFunction<Integer, Integer> function = new RichAsyncFunction<Integer, Integer>() {
+ private static final long serialVersionUID = -2023923961609455894L;
+
@Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getRuntimeContext().getState(mock(ValueStateDescriptor.class));
+ public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+ // no op
}
};
- return function;
+ int superstepNumber = 42;
+
+ IterationRuntimeContext mockedIterationRuntimeContext = mock(IterationRuntimeContext.class);
+ when(mockedIterationRuntimeContext.getSuperstepNumber()).thenReturn(superstepNumber);
+ function.setRuntimeContext(mockedIterationRuntimeContext);
+
+ IterationRuntimeContext iterationRuntimeContext = function.getIterationRuntimeContext();
+
+ assertEquals(superstepNumber, iterationRuntimeContext.getSuperstepNumber());
+
+ try {
+ iterationRuntimeContext.getIterationAggregator("foobar");
+ fail("Expected getIterationAggregator to fail with unsupported operation exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ iterationRuntimeContext.getPreviousIterationAggregate("foobar");
+ fail("Expected getPreviousIterationAggregator to fail with unsupported operation exception");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
}
+ /**
+ * Test the set of runtime context methods in the context of a {@link RichAsyncFunction}.
+ */
@Test
- public void testIterationRuntimeContext() throws Exception {
- // test runtime context is not set
- RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+ public void testRuntimeContext() throws Exception {
+ RichAsyncFunction<Integer, Integer> function = new RichAsyncFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 1707630162838967972L;
+
@Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getIterationRuntimeContext().getIterationAggregator("test");
+ public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+ // no op
}
};
+ final String taskName = "foobarTask";
+ final MetricGroup metricGroup = mock(MetricGroup.class);
+ final int numberOfParallelSubtasks = 42;
+ final int indexOfSubtask = 43;
+ final int attemptNumber = 1337;
+ final String taskNameWithSubtask = "barfoo";
+ final ExecutionConfig executionConfig = mock(ExecutionConfig.class);
+ final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+
+ RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class);
+
+ when(mockedRuntimeContext.getTaskName()).thenReturn(taskName);
+ when(mockedRuntimeContext.getMetricGroup()).thenReturn(metricGroup);
+ when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(numberOfParallelSubtasks);
+ when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask);
+ when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber);
+ when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
+ when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+
+ function.setRuntimeContext(mockedRuntimeContext);
+
+ RuntimeContext runtimeContext = function.getRuntimeContext();
+
+ assertEquals(taskName, runtimeContext.getTaskName());
+ assertEquals(metricGroup, runtimeContext.getMetricGroup());
+ assertEquals(numberOfParallelSubtasks, runtimeContext.getNumberOfParallelSubtasks());
+ assertEquals(indexOfSubtask, runtimeContext.getIndexOfThisSubtask());
+ assertEquals(attemptNumber, runtimeContext.getAttemptNumber());
+ assertEquals(taskNameWithSubtask, runtimeContext.getTaskNameWithSubtasks());
+ assertEquals(executionConfig, runtimeContext.getExecutionConfig());
+ assertEquals(userCodeClassLoader, runtimeContext.getUserCodeClassLoader());
+
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
- }
- catch (Exception e) {
- Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+ runtimeContext.getDistributedCache();
+ fail("Expected getDistributedCached to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // test get agg from iteration runtime context
- function.setRuntimeContext(mock(IterationRuntimeContext.class));
-
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ runtimeContext.getState(new ValueStateDescriptor<>("foobar", Integer.class, 42));
+ fail("Expected getState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- catch (Exception e) {
- Assert.assertEquals("Get iteration aggregator is not supported in rich async function", e.getMessage());
+
+ try {
+ runtimeContext.getListState(new ListStateDescriptor<>("foobar", Integer.class));
+ fail("Expected getListState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // get state from iteration runtime context
- function = new RichAsyncFunction<String, String>() {
- @Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getIterationRuntimeContext().getState(mock(ValueStateDescriptor.class));
- }
- };
+ try {
+ runtimeContext.getReducingState(new ReducingStateDescriptor<>("foobar", new ReduceFunction<Integer>() {
+ private static final long serialVersionUID = 2136425961884441050L;
- function.setRuntimeContext(mock(RuntimeContext.class));
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1;
+ }
+ }, Integer.class));
+ fail("Expected getReducingState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
- }
- catch (Exception e) {
- Assert.assertEquals("State is not supported in rich async function", e.getMessage());
- }
+ runtimeContext.addAccumulator("foobar", new Accumulator<Integer, Integer>() {
+ private static final long serialVersionUID = -4673320336846482358L;
- // test getting a counter from iteration runtime context
- function = new RichAsyncFunction<String, String>() {
- @Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getIterationRuntimeContext().getIntCounter("test").add(6);
- }
- };
+ @Override
+ public void add(Integer value) {
+ // no op
+ }
- IterationRuntimeContext context = mock(IterationRuntimeContext.class);
- IntCounter counter = new IntCounter(0);
- when(context.getIntCounter(anyString())).thenReturn(counter);
+ @Override
+ public Integer getLocalValue() {
+ return null;
+ }
- function.setRuntimeContext(context);
+ @Override
+ public void resetLocal() {
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ }
- Assert.assertTrue(6 == counter.getLocalValue());
- }
+ @Override
+ public void merge(Accumulator<Integer, Integer> other) {
- @Test
- public void testRuntimeContext() throws Exception {
- // test run time context is not set
- RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
- @Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getRuntimeContext().getState(mock(ValueStateDescriptor.class));
- }
- };
+ }
- try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
- }
- catch (Exception e) {
- Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+ @Override
+ public Accumulator<Integer, Integer> clone() {
+ return null;
+ }
+ });
+ fail("Expected addAccumulator to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // test get state
- function = new RichAsyncFunction<String, String>() {
- @Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getRuntimeContext().getState(mock(ValueStateDescriptor.class));
- }
- };
+ try {
+ runtimeContext.getAccumulator("foobar");
+ fail("Expected getAccumulator to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- function.setRuntimeContext(mock(RuntimeContext.class));
+ try {
+ runtimeContext.getAllAccumulators();
+ fail("Expected getAllAccumulators to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
try {
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ runtimeContext.getIntCounter("foobar");
+ fail("Expected getIntCounter to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- catch (Exception e) {
- Assert.assertEquals("State is not supported in rich async function", e.getMessage());
+
+ try {
+ runtimeContext.getLongCounter("foobar");
+ fail("Expected getLongCounter to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
}
- // test getting a counter from runtime context
- function = new RichAsyncFunction<String, String>() {
- @Override
- public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
- getIterationRuntimeContext().getIntCounter("test").add(6);
- }
- };
+ try {
+ runtimeContext.getDoubleCounter("foobar");
+ fail("Expected getDoubleCounter to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- IterationRuntimeContext context = mock(IterationRuntimeContext.class);
- IntCounter counter = new IntCounter(0);
- when(context.getIntCounter(anyString())).thenReturn(counter);
+ try {
+ runtimeContext.getHistogram("foobar");
+ fail("Expected getHistogram to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- function.setRuntimeContext(context);
+ try {
+ runtimeContext.hasBroadcastVariable("foobar");
+ fail("Expected hasBroadcastVariable to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- function.asyncInvoke("test", mock(AsyncCollector.class));
+ try {
+ runtimeContext.getBroadcastVariable("foobar");
+ fail("Expected getBroadcastVariable to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
- Assert.assertTrue(6 == counter.getLocalValue());
+ try {
+ runtimeContext.getBroadcastVariableWithInitializer("foobar", new BroadcastVariableInitializer<Object, Object>() {
+ @Override
+ public Object initializeBroadcastVariable(Iterable<Object> data) {
+ return null;
+ }
+ });
+ fail("Expected getBroadcastVariableWithInitializer to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
deleted file mode 100644
index d118d80..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
+++ /dev/null
@@ -1,656 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.async;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
-import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link AsyncCollectorBuffer}. These test that:
- *
- * <ul>
- * <li>Add a new item into the buffer</li>
- * <li>Ordered mode processing</li>
- * <li>Unordered mode processing</li>
- * <li>Error handling</li>
- * </ul>
- */
-public class AsyncCollectorBufferTest {
- private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
-
- private final Random RANDOM = new Random();
-
- private AsyncFunction<Integer, Integer> function;
-
- private AsyncWaitOperator<Integer, Integer> operator;
-
- private AsyncCollectorBuffer<Integer, Integer> buffer;
-
- private Output<StreamRecord<Integer>> output;
-
- private Object lock = new Object();
-
- public AsyncCollectorBuffer<Integer, Integer> getBuffer(int bufferSize, AsyncDataStream.OutputMode mode) throws Exception {
- function = new AsyncFunction<Integer, Integer>() {
- @Override
- public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
-
- }
- };
-
- operator = new AsyncWaitOperator<>(function, bufferSize, mode);
-
- StreamConfig cfg = new StreamConfig(new Configuration());
- cfg.setTypeSerializerIn1(IntSerializer.INSTANCE);
-
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
-
- when(mockTask.getCheckpointLock()).thenReturn(lock);
-
- Environment env = new DummyEnvironment("DUMMY;-D", 1, 0);
- when(mockTask.getEnvironment()).thenReturn(env);
-
- output = new FakedOutput();
-
- operator.setup(mockTask, cfg, output);
-
- buffer = operator.getBuffer();
-
- return buffer;
- }
-
- @Test
- public void testAdd() throws Exception {
- buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
-
- synchronized (lock) {
- buffer.addWatermark(new Watermark(0l));
- buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
- }
-
- Assert.assertEquals(2, buffer.getQueue().size());
-
- Iterator<StreamElementEntry<Integer>> iterator = buffer.getQueue().iterator();
- Watermark watermark = iterator.next().getStreamElement().asWatermark();
- Assert.assertEquals(0l, watermark.getTimestamp());
-
- LatencyMarker latencyMarker = iterator.next().getStreamElement().asLatencyMarker();
- Assert.assertEquals(111l, latencyMarker.getMarkedTime());
-
- buffer.setExtraStreamElement(new Watermark(222l));
-
- Iterator<StreamElement> elementIterator = buffer.getStreamElementsInBuffer();
- Assert.assertEquals(0l, elementIterator.next().asWatermark().getTimestamp());
- Assert.assertEquals(111l, elementIterator.next().asLatencyMarker().getMarkedTime());
- Assert.assertEquals(222l, elementIterator.next().asWatermark().getTimestamp());
- Assert.assertFalse(elementIterator.hasNext());
- }
-
- private void work(final boolean throwExcept) throws Exception {
- final int ASYNC_COLLECTOR_NUM = 7;
-
- Iterator<StreamElement> iterator = new Iterator<StreamElement>() {
- private int idx = 0;
-
- @Override
- public boolean hasNext() {
- return idx < ASYNC_COLLECTOR_NUM;
- }
-
- @Override
- public StreamElement next() {
- ++idx;
-
- if (idx == 4) {
- return new Watermark(333l);
- }
- else if (idx == 7) {
- return new LatencyMarker(111L, 0, 0);
- }
- else {
- StreamRecord<Integer> ret = new StreamRecord<>(idx);
- ret.setTimestamp(idx * idx);
-
- return ret;
- }
- }
-
- @Override
- public void remove() {
- // do nothing
- }
- };
-
- while (iterator.hasNext()) {
- final StreamElement record = iterator.next();
-
- if (record.isRecord()) {
- AsyncCollector tmp;
-
- synchronized (lock) {
- tmp = buffer.addStreamRecord(record.<Integer>asRecord());
- }
-
- final AsyncCollector collector = tmp;
-
- EXECUTOR_SERVICE.submit(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(RANDOM.nextInt(100));
-
- if (throwExcept) {
- collector.collect(new Exception("wahahahaha..."));
- }
- else {
- collector.collect(Collections.singletonList(record.asRecord().getValue()));
- }
- } catch (InterruptedException e) {
- // do nothing
- }
- }
- });
- }
- else if (record.isWatermark()) {
- synchronized (lock) {
- buffer.addWatermark(record.asWatermark());
- }
- }
- else {
- synchronized (lock) {
- buffer.addLatencyMarker(record.asLatencyMarker());
- }
- }
- }
- }
-
- @Test
- public void testOrderedBuffer() throws Exception {
- buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
-
- buffer.startEmitterThread();
-
- work(false);
-
- synchronized (lock) {
- buffer.waitEmpty();
- }
-
- buffer.stopEmitterThread();
-
- Assert.assertEquals("1,2,3,5,6,", ((FakedOutput)output).getValue());
- Assert.assertEquals("1,4,9,333,25,36,111,", ((FakedOutput)output).getTimestamp());
- }
-
- @Test
- public void testUnorderedBuffer() throws Exception {
- buffer = getBuffer(3, AsyncDataStream.OutputMode.UNORDERED);
-
- buffer.startEmitterThread();
-
- work(false);
-
- synchronized (lock) {
- buffer.waitEmpty();
- }
-
- buffer.stopEmitterThread();
-
- Assert.assertEquals(333L, ((FakedOutput)output).getRawTimestamp().toArray()[3]);
-
- List<Long> result = ((FakedOutput)output).getRawValue();
- Collections.sort(result);
- Assert.assertEquals("[1, 2, 3, 5, 6]", result.toString());
-
- result = ((FakedOutput)output).getRawTimestamp();
- Collections.sort(result);
- Assert.assertEquals("[1, 4, 9, 25, 36, 111, 333]", result.toString());
- }
-
- @Test
- public void testOrderedBufferWithManualTriggering() throws Exception {
- // test AsyncCollectorBuffer with different combinations of StreamElements in the buffer.
- // by triggering completion of each AsyncCollector one by one manually, we can verify
- // the output one by one accurately.
-
- FakedOutput fakedOutput;
- AsyncCollector<Integer> collector1, collector2;
-
- // 1. head element is a Watermark or LatencyMarker
- buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
- fakedOutput = (FakedOutput)output;
-
- fakedOutput.expect(1);
-
- buffer.startEmitterThread();
-
- synchronized (lock) {
- buffer.addWatermark(new Watermark(1L));
- }
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
-
- fakedOutput.expect(1);
-
- synchronized (lock) {
- buffer.addLatencyMarker(new LatencyMarker(2L, 0, 0));
- }
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("1,2,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
-
-
- // 2. buffer layout: WM -> SR1 -> LM -> SR2, where SR2 finishes first, then SR1.
- buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
- fakedOutput = (FakedOutput)output;
-
- synchronized (lock) {
- buffer.addWatermark(new Watermark(1L));
- collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
- buffer.addLatencyMarker(new LatencyMarker(3L, 0, 0));
- collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 4L));
- }
-
- fakedOutput.expect(1);
-
- buffer.startEmitterThread();
-
- fakedOutput.waitToFinish();
-
- // in ORDERED mode, the result of completed SR2 will not be emitted right now.
- collector2.collect(Collections.singletonList(222));
-
- Thread.sleep(1000);
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
- fakedOutput.expect(3);
-
- collector1.collect(Collections.singletonList(111));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("111,222,", fakedOutput.getValue());
- Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
-
- // 3. buffer layout: WM -> SR1 -> LM -> S2, where SR1 completes first, then SR2.
- buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
- fakedOutput = (FakedOutput)output;
-
- synchronized (lock) {
- buffer.addWatermark(new Watermark(1L));
- collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
- buffer.addLatencyMarker(new LatencyMarker(3L, 0, 0));
- collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 4L));
- }
-
- fakedOutput.expect(1);
-
- buffer.startEmitterThread();
-
- fakedOutput.waitToFinish();
-
- fakedOutput.expect(2);
-
- // in ORDERED mode, the result of completed SR1 will be emitted asap.
- collector1.collect(Collections.singletonList(111));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("111,", fakedOutput.getValue());
- Assert.assertEquals("1,2,3,", fakedOutput.getTimestamp());
-
- fakedOutput.expect(1);
-
- collector2.collect(Collections.singletonList(222));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("111,222,", fakedOutput.getValue());
- Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
-
- // 4. buffer layout: SR1 -> SR2 -> WM -> LM, where SR2 finishes first.
- buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
- fakedOutput = (FakedOutput)output;
-
- synchronized (lock) {
- collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 1L));
- collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 2L));
- buffer.addWatermark(new Watermark(3L));
- buffer.addLatencyMarker(new LatencyMarker(4L, 0, 0));
- }
-
- buffer.startEmitterThread();
-
- // in ORDERED mode, the result of completed SR2 will not be emitted right now.
- collector2.collect(Collections.singletonList(222));
-
- Thread.sleep(1000);
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("", fakedOutput.getTimestamp());
-
- fakedOutput.expect(4);
-
- collector1.collect(Collections.singletonList(111));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("111,222,", fakedOutput.getValue());
- Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
- }
-
- @Test
- public void testUnorderedWithManualTriggering() throws Exception {
- // verify the output in UNORDERED mode by manual triggering.
-
- FakedOutput fakedOutput;
- AsyncCollector<Integer> collector1, collector2, collector3;
-
- // 1. head element is a Watermark or LatencyMarker
- buffer = getBuffer(5, AsyncDataStream.OutputMode.UNORDERED);
- fakedOutput = (FakedOutput)output;
-
- fakedOutput.expect(1);
-
- buffer.startEmitterThread();
-
- synchronized (lock) {
- buffer.addWatermark(new Watermark(1L));
- }
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
-
- fakedOutput.expect(1);
-
- synchronized (lock) {
- buffer.addLatencyMarker(new LatencyMarker(2L, 0, 0));
- }
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("1,2,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
-
-
- // 2. buffer layout: LM -> SR1 -> SR2 -> WM1 -> SR3 -> WM2, where the order of completion is SR3, SR2, SR1
- buffer = getBuffer(6, AsyncDataStream.OutputMode.UNORDERED);
- fakedOutput = (FakedOutput)output;
-
- synchronized (lock) {
- buffer.addLatencyMarker(new LatencyMarker(1L, 0, 0));
- collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
- collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 3L));
- buffer.addWatermark(new Watermark(4L));
- collector3 = buffer.addStreamRecord(new StreamRecord<>(333, 5L));
- buffer.addWatermark(new Watermark(6L));
- }
-
- fakedOutput.expect(1);
-
- buffer.startEmitterThread();
-
- fakedOutput.waitToFinish();
-
- // in UNORDERED mode, the result of completed SR3 will not be emitted right now.
- collector3.collect(Collections.singletonList(333));
-
- Thread.sleep(1000);
-
- Assert.assertEquals("", fakedOutput.getValue());
- Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
- fakedOutput.expect(1);
-
- // SR2 will be emitted
- collector2.collect(Collections.singletonList(222));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("222,", fakedOutput.getValue());
- Assert.assertEquals("1,3,", fakedOutput.getTimestamp());
-
- // SR1 will be emitted first, then WM, and then SR3 and WM2
- fakedOutput.expect(4);
- collector1.collect(Collections.singletonList(111));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("222,111,333,", fakedOutput.getValue());
- Assert.assertEquals("1,3,2,4,5,6,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
-
- // 3. buffer layout: WM1 -> SR1 -> SR2 -> LM -> SR3 -> WM2, where the order of completion is SR2, SR1, SR3
- buffer = getBuffer(6, AsyncDataStream.OutputMode.UNORDERED);
- fakedOutput = (FakedOutput)output;
-
- synchronized (lock) {
- buffer.addWatermark(new Watermark(1L));
- collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
- collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 3L));
- buffer.addLatencyMarker(new LatencyMarker(4L, 0, 0));
- collector3 = buffer.addStreamRecord(new StreamRecord<>(333, 5L));
- buffer.addWatermark(new Watermark(6L));
- }
-
- // the result of SR2 will be emitted following WM1
- collector2.collect(Collections.singletonList(222));
-
- fakedOutput.expect(2);
-
- buffer.startEmitterThread();
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("222,", fakedOutput.getValue());
- Assert.assertEquals("1,3,", fakedOutput.getTimestamp());
-
- // SR1 and LM will be emitted
- fakedOutput.expect(2);
- collector1.collect(Collections.singletonList(111));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("222,111,", fakedOutput.getValue());
- Assert.assertEquals("1,3,2,4,", fakedOutput.getTimestamp());
-
- // SR3 and WM2 will be emitted
- fakedOutput.expect(2);
- collector3.collect(Collections.singletonList(333));
-
- fakedOutput.waitToFinish();
-
- Assert.assertEquals("222,111,333,", fakedOutput.getValue());
- Assert.assertEquals("1,3,2,4,5,6,", fakedOutput.getTimestamp());
-
- synchronized (lock) {
- buffer.waitEmpty();
- buffer.stopEmitterThread();
- }
-
- }
-
-
-
- @Test
- public void testBufferWithException() throws Exception {
- buffer = getBuffer(3, AsyncDataStream.OutputMode.UNORDERED);
-
- buffer.startEmitterThread();
-
- IOException expected = null;
- try {
- work(true);
- }
- catch (IOException e) {
- expected = e;
- }
-
- Assert.assertNotNull(expected);
- Assert.assertEquals(expected.getMessage(), "wahahahaha...");
-
- synchronized (lock) {
- buffer.waitEmpty();
- }
-
- buffer.stopEmitterThread();
- }
-
- public class FakedOutput implements Output<StreamRecord<Integer>> {
- private List<Long> outputs;
- private List<Long> timestamps;
-
- private CountDownLatch latch;
-
- public FakedOutput() {
- this.outputs = new ArrayList<>();
- this.timestamps = new ArrayList<>();
- }
-
- @Override
- public void collect(StreamRecord<Integer> record) {
- outputs.add(record.getValue().longValue());
- if (record.hasTimestamp()) {
- timestamps.add(record.getTimestamp());
- }
-
- if (latch != null) {
- latch.countDown();
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- timestamps.add(mark.getTimestamp());
-
- if (latch != null) {
- latch.countDown();
- }
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- timestamps.add(latencyMarker.getMarkedTime());
-
- if (latch != null) {
- latch.countDown();
- }
- }
-
- @Override
- public void close() {
- }
-
- public String getValue() {
- StringBuilder sb = new StringBuilder();
- for (Long i : outputs) {
- sb.append(i).append(",");
- }
- return sb.toString();
- }
-
- public String getTimestamp() {
- StringBuilder sb = new StringBuilder();
- for (Long i : timestamps) {
- sb.append(i).append(",");
- }
- return sb.toString();
- }
-
- public List<Long> getRawValue() {
- return outputs;
- }
-
- public List<Long> getRawTimestamp() {
- return timestamps;
- }
-
- public void expect(int count) {
- latch = new CountDownLatch(count);
- }
-
- public void waitToFinish() throws InterruptedException {
- latch.await();
- }
- }
-}