You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/29 00:11:45 UTC
[2/3] samza git commit: SAMZA-1386: Inline End-of-stream and
Watermark logic inside OperatorImpl
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 99496eb..9b747bc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -18,9 +18,14 @@
*/
package org.apache.samza.operators.impl;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.job.model.JobModel;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -36,6 +41,8 @@ import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -50,6 +57,7 @@ import java.util.Map;
* The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s.
*/
public class OperatorImplGraph {
+ private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class);
/**
* A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
@@ -84,10 +92,27 @@ public class OperatorImplGraph {
*/
public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) {
this.clock = clock;
+
+ TaskContextImpl taskContext = (TaskContextImpl) context;
+ Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(streamGraph) ?
+ getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()),
+ getIntermediateToInputStreamsMap(streamGraph)) :
+ Collections.EMPTY_MAP;
+ producerTaskCounts.forEach((stream, count) -> {
+ LOG.info("{} has {} producer tasks.", stream, count);
+ });
+
+ // set states for end-of-stream
+ taskContext.registerObject(EndOfStreamStates.class.getName(),
+ new EndOfStreamStates(context.getSystemStreamPartitions(), producerTaskCounts));
+ // set states for watermark
+ taskContext.registerObject(WatermarkStates.class.getName(),
+ new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts));
+
streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
InputOperatorImpl inputOperatorImpl =
- (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, config, context);
+ (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context);
this.inputOperators.put(systemStream, inputOperatorImpl);
});
}
@@ -128,17 +153,18 @@ public class OperatorImplGraph {
* @return the operator implementation for the operatorSpec
*/
OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
- Config config, TaskContext context) {
+ SystemStream inputStream, Config config, TaskContext context) {
if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) {
// Either this is the first time we've seen this operatorSpec, or this is a join operator spec
// and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
operatorImpl.init(config, context);
+ operatorImpl.registerInputStream(inputStream);
operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
registeredSpecs.forEach(registeredSpec -> {
- OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, config, context);
+ OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
operatorImpl.registerNextOperator(nextImpl);
});
return operatorImpl;
@@ -246,4 +272,70 @@ public class OperatorImplGraph {
}
};
}
+
+ private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) {
+ return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet());
+ }
+
+ /**
+ * calculate the task count that produces to each intermediate streams
+ * @param streamToConsumerTasks input streams to task mapping
+ * @param intermediateToInputStreams intermediate stream to input streams mapping
+ * @return mapping from intermediate stream to task count
+ */
+ static Map<SystemStream, Integer> getProducerTaskCountForIntermediateStreams(
+ Multimap<SystemStream, String> streamToConsumerTasks,
+ Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
+ Map<SystemStream, Integer> result = new HashMap<>();
+ intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
+ result.put(entry.getKey(),
+ entry.getValue().stream()
+ .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream())
+ .collect(Collectors.toSet()).size());
+ });
+ return result;
+ }
+
+ /**
+ * calculate the mapping from input streams to consumer tasks
+ * @param jobModel JobModel object
+ * @return mapping from input stream to tasks
+ */
+ static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel jobModel) {
+ Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create();
+ jobModel.getContainers().values().forEach(containerModel -> {
+ containerModel.getTasks().values().forEach(taskModel -> {
+ taskModel.getSystemStreamPartitions().forEach(ssp -> {
+ streamToConsumerTasks.put(ssp.getSystemStream(), taskModel.getTaskName().getTaskName());
+ });
+ });
+ });
+ return streamToConsumerTasks;
+ }
+
+ /**
+ * calculate the mapping from output streams to input streams
+ * @param streamGraph the user {@link StreamGraphImpl} instance
+ * @return mapping from output streams to input streams
+ */
+ static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) {
+ Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create();
+ streamGraph.getInputOperators().entrySet().stream()
+ .forEach(
+ entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams));
+ return outputToInputStreams;
+ }
+
+ private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec,
+ Multimap<SystemStream, SystemStream> outputToInputStreams) {
+ if (opSpec instanceof OutputOperatorSpec) {
+ OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
+ if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
+ outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(), input);
+ }
+ } else {
+ Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
+ nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index f212b3e..205bba6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -21,12 +21,16 @@ package org.apache.samza.operators.impl;
import java.util.Collection;
import java.util.Collections;
import org.apache.samza.config.Config;
-import org.apache.samza.control.Watermark;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.system.ControlMessage;
+import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
@@ -39,10 +43,16 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
private final OutputOperatorSpec<M> outputOpSpec;
private final OutputStreamImpl<?, ?, M> outputStream;
+ private final String taskName;
+ private final ControlMessageSender controlMessageSender;
OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
this.outputOpSpec = outputOpSpec;
this.outputStream = outputOpSpec.getOutputStream();
+ this.taskName = context.getTaskName().getTaskName();
+
+ StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache();
+ this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
}
@Override
@@ -71,12 +81,22 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
}
@Override
- protected long handleWatermark(Watermark inputWatermark,
- MessageCollector collector,
- TaskCoordinator coordinator) {
+ protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
- inputWatermark.propagate(outputStream.getStreamSpec().toSystemStream());
+ sendControlMessage(new EndOfStreamMessage(taskName), collector);
}
- return inputWatermark.getTimestamp();
+ }
+
+ @Override
+ protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
+ if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
+ sendControlMessage(new WatermarkMessage(watermark, taskName), collector);
+ }
+ return watermark;
+ }
+
+ private void sendControlMessage(ControlMessage message, MessageCollector collector) {
+ SystemStream outputStream = outputOpSpec.getOutputStream().getStreamSpec().toSystemStream();
+ controlMessageSender.send(message, outputStream, collector);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
new file mode 100644
index 0000000..0295626
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the watermarks coming from input/intermediate streams in a task. Internally it keeps track
+ * of the latest watermark timestamp from each upstream task, and use the min as the consolidated watermark time.
+ *
+ * This class is thread-safe. However, having parallelism within a task may result in out-of-order processing
+ * and inaccurate watermarks. In this scenario, watermarks might be emitted before the previous messages fully processed.
+ */
+class WatermarkStates {
+ private static final Logger LOG = LoggerFactory.getLogger(WatermarkStates.class);
+
+ public static final long WATERMARK_NOT_EXIST = -1;
+
+ private final static class WatermarkState {
+ private final int expectedTotal;
+ private final Map<String, Long> timestamps = new HashMap<>();
+ private volatile long watermarkTime = WATERMARK_NOT_EXIST;
+
+ WatermarkState(int expectedTotal) {
+ this.expectedTotal = expectedTotal;
+ }
+
+ synchronized void update(long timestamp, String taskName) {
+ if (taskName != null) {
+ Long ts = timestamps.get(taskName);
+ if (ts != null && ts > timestamp) {
+ LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s",
+ timestamp, ts, taskName));
+ } else {
+ timestamps.put(taskName, timestamp);
+ }
+ }
+
+ /**
+ * Check whether we got all the watermarks.
+ * At a sources, the expectedTotal is 0.
+ * For any intermediate streams, the expectedTotal is the upstream task count.
+ */
+ if (timestamps.size() == expectedTotal) {
+ Optional<Long> min = timestamps.values().stream().min(Long::compare);
+ watermarkTime = min.orElse(timestamp);
+ }
+ }
+
+ long getWatermarkTime() {
+ return watermarkTime;
+ }
+ }
+
+ private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
+
+ WatermarkStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
+ Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
+ ssps.forEach(ssp -> {
+ states.put(ssp, new WatermarkState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
+ });
+ this.watermarkStates = Collections.unmodifiableMap(states);
+ }
+
+ /**
+ * Update the state upon receiving a watermark message.
+ * @param watermarkMessage message of {@link WatermarkMessage}
+ * @param ssp system stream partition
+ * @return true iff the stream has a new watermark
+ */
+ void update(WatermarkMessage watermarkMessage, SystemStreamPartition ssp) {
+ WatermarkState state = watermarkStates.get(ssp);
+ if (state != null) {
+ state.update(watermarkMessage.getTimestamp(), watermarkMessage.getTaskName());
+ } else {
+ LOG.error("SSP {} doesn't have watermark states", ssp);
+ }
+ }
+
+ long getWatermark(SystemStream systemStream) {
+ return watermarkStates.entrySet().stream()
+ .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
+ .map(entry -> entry.getValue().getWatermarkTime())
+ .min(Long::compare)
+ .orElse(WATERMARK_NOT_EXIST);
+ }
+
+ /* package private for testing */
+ long getWatermarkPerSSP(SystemStreamPartition ssp) {
+ return watermarkStates.get(ssp).getWatermarkTime();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 6fbc3c1..773f742 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.spec;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.system.StreamSpec;
import java.util.function.BiFunction;
@@ -49,4 +50,9 @@ public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
public BiFunction<K, V, M> getMsgBuilder() {
return this.msgBuilder;
}
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 16f59d7..f4fe0fd 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
/**
@@ -70,4 +71,9 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
public long getTtlMs() {
return ttlMs;
}
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index f64e123..4047d92 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.WatermarkFunction;
import java.util.Collection;
import java.util.LinkedHashSet;
@@ -118,4 +119,6 @@ public abstract class OperatorSpec<M, OM> {
public final String getOpName() {
return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
}
+
+ abstract public WatermarkFunction getWatermarkFn();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index e6767ec..9759392 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -19,6 +19,8 @@
package org.apache.samza.operators.spec;
+import org.apache.samza.operators.functions.WatermarkFunction;
+
/**
* The spec for an operator that outputs a {@link org.apache.samza.operators.MessageStream} to a
* {@link org.apache.samza.system.SystemStream}.
@@ -52,4 +54,9 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
public OutputStreamImpl<?, ?, M> getOutputStream() {
return this.outputStream;
}
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 2b55d95..1145be8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
/**
@@ -48,4 +49,9 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
public SinkFunction<M> getSinkFn() {
return this.sinkFn;
}
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return sinkFn instanceof WatermarkFunction ? (WatermarkFunction) sinkFn : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index 1f2f683..aace2e2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
/**
@@ -46,4 +47,9 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
public FlatMapFunction<M, OM> getTransformFn() {
return this.transformFn;
}
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ return transformFn instanceof WatermarkFunction ? (WatermarkFunction) transformFn : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 0937499..75f1427 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,6 +19,8 @@
package org.apache.samza.operators.spec;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.triggers.AnyTrigger;
import org.apache.samza.operators.triggers.RepeatingTrigger;
import org.apache.samza.operators.triggers.TimeBasedTrigger;
@@ -109,4 +111,10 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
}
return timeBasedTriggers;
}
+
+ @Override
+ public WatermarkFunction getWatermarkFn() {
+ FoldLeftFunction fn = window.getFoldLeftFunction();
+ return fn instanceof WatermarkFunction ? (WatermarkFunction) fn : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index 0b98ec6..2ed559f 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -21,9 +21,9 @@ package org.apache.samza.serializers;
import java.util.Arrays;
import org.apache.samza.SamzaException;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.message.IntermediateMessageType;
-import org.apache.samza.message.WatermarkMessage;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.WatermarkMessage;
import org.codehaus.jackson.type.TypeReference;
@@ -86,16 +86,16 @@ public class IntermediateMessageSerde implements Serde<Object> {
public Object fromBytes(byte[] bytes) {
try {
final Object object;
- final IntermediateMessageType type = IntermediateMessageType.values()[bytes[0]];
+ final MessageType type = MessageType.values()[bytes[0]];
final byte [] data = Arrays.copyOfRange(bytes, 1, bytes.length);
switch (type) {
case USER_MESSAGE:
object = userMessageSerde.fromBytes(data);
break;
- case WATERMARK_MESSAGE:
+ case WATERMARK:
object = watermarkSerde.fromBytes(data);
break;
- case END_OF_STREAM_MESSAGE:
+ case END_OF_STREAM:
object = eosSerde.fromBytes(data);
break;
default:
@@ -117,15 +117,15 @@ public class IntermediateMessageSerde implements Serde<Object> {
@Override
public byte[] toBytes(Object object) {
final byte [] data;
- final IntermediateMessageType type = IntermediateMessageType.of(object);
+ final MessageType type = MessageType.of(object);
switch (type) {
case USER_MESSAGE:
data = userMessageSerde.toBytes(object);
break;
- case WATERMARK_MESSAGE:
+ case WATERMARK:
data = watermarkSerde.toBytes((WatermarkMessage) object);
break;
- case END_OF_STREAM_MESSAGE:
+ case END_OF_STREAM:
data = eosSerde.toBytes((EndOfStreamMessage) object);
break;
default:
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
index e57a89f..e2fea95 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -21,11 +21,7 @@ package org.apache.samza.task;
import java.util.concurrent.ExecutorService;
import org.apache.samza.config.Config;
-import org.apache.samza.control.ControlMessageListenerTask;
-import org.apache.samza.control.Watermark;
-import org.apache.samza.control.IOGraph;
import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
/**
@@ -34,7 +30,7 @@ import org.apache.samza.system.SystemStream;
* the callbacks once it's done. If the thread pool is null, it follows the legacy
* synchronous model to execute the tasks on the run loop thread.
*/
-public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask, ControlMessageListenerTask {
+public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask {
private final StreamTask wrappedTask;
private final ExecutorService executor;
@@ -100,20 +96,4 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi
((EndOfStreamListenerTask) wrappedTask).onEndOfStream(collector, coordinator);
}
}
-
- @Override
- public IOGraph getIOGraph() {
- if (wrappedTask instanceof ControlMessageListenerTask) {
- return ((ControlMessageListenerTask) wrappedTask).getIOGraph();
- }
- return null;
- }
-
- @Override
- public void onWatermark(Watermark watermark, SystemStream stream, MessageCollector collector, TaskCoordinator coordinator) {
- if (wrappedTask instanceof ControlMessageListenerTask) {
- ((ControlMessageListenerTask) wrappedTask).onWatermark(watermark, stream, collector, coordinator);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 16b7e40..0074e24 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -21,25 +21,28 @@ package org.apache.samza.task;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.control.ControlMessageListenerTask;
-import org.apache.samza.control.Watermark;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.impl.InputOperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
-import org.apache.samza.control.IOGraph;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link StreamTask} implementation that brings all the operator API implementation components together and
* feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
*/
-public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask, ControlMessageListenerTask {
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
private final StreamApplication streamApplication;
private final ApplicationRunner runner;
@@ -47,7 +50,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
private OperatorImplGraph operatorImplGraph;
private ContextManager contextManager;
- private IOGraph ioGraph;
/**
* Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
@@ -91,7 +93,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
// create the operator impl DAG corresponding to the logical operator spec DAG
this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock);
- this.ioGraph = streamGraph.toIOGraph();
}
/**
@@ -110,7 +111,21 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
if (inputOpImpl != null) {
- inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+ switch (MessageType.of(ime.getMessage())) {
+ case USER_MESSAGE:
+ inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+ break;
+
+ case END_OF_STREAM:
+ EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
+ inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
+ break;
+
+ case WATERMARK:
+ WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
+ inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
+ break;
+ }
}
}
@@ -121,22 +136,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
}
@Override
- public IOGraph getIOGraph() {
- return ioGraph;
- }
-
- @Override
- public final void onWatermark(Watermark watermark,
- SystemStream systemStream,
- MessageCollector collector,
- TaskCoordinator coordinator) {
- InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
- if (inputOpImpl != null) {
- inputOpImpl.onWatermark(watermark, collector, coordinator);
- }
- }
-
- @Override
public void close() throws Exception {
if (this.contextManager != null) {
this.contextManager.close();
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 8c739d4..1b2ce80 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -24,6 +24,7 @@ package org.apache.samza.checkpoint
import java.util.HashMap
import java.util.concurrent.ConcurrentHashMap
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
@@ -193,7 +194,7 @@ class OffsetManager(
*/
def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
- if (offset != null) {
+ if (offset != null && !offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
}
}
@@ -216,6 +217,10 @@ class OffsetManager(
}
}
+ def setStartingOffset(taskName: TaskName, ssp: SystemStreamPartition, offset: String): Unit = {
+ startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
+ }
+
/**
* Gets a snapshot of all the current offsets for the specified task. This is useful to
* ensure there are no concurrent updates to the offsets between when this method is
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5a19d90..acec365 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -20,25 +20,17 @@
package org.apache.samza.container
-import com.google.common.collect.HashMultimap
-import com.google.common.collect.Multimap
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.control.ControlMessageListenerTask
-import org.apache.samza.control.ControlMessageUtils
-import org.apache.samza.control.EndOfStreamManager
-import org.apache.samza.control.WatermarkManager
import org.apache.samza.job.model.JobModel
-import org.apache.samza.message.MessageType
import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.StreamMetadataCache
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.task.AsyncStreamTask
import org.apache.samza.task.ClosableTask
@@ -47,34 +39,11 @@ import org.apache.samza.task.InitableTask
import org.apache.samza.task.ReadableCoordinator
import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskCallbackFactory
-import org.apache.samza.task.TaskContext
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.task.WindowableTask
import org.apache.samza.util.Logging
import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-
-object TaskInstance {
- /**
- * Build a map from a stream to its consumer tasks
- * @param jobModel job model which contains ssp-to-task assignment
- * @return the map of input stream to tasks
- */
- def buildInputToTasks(jobModel: JobModel): Multimap[SystemStream, String] = {
- val streamToTasks: Multimap[SystemStream, String] = HashMultimap.create[SystemStream, String]
- if (jobModel != null) {
- for (containerModel <- jobModel.getContainers.values) {
- for (taskModel <- containerModel.getTasks.values) {
- for (ssp <- taskModel.getSystemStreamPartitions) {
- streamToTasks.put(ssp.getSystemStream, taskModel.getTaskName.toString)
- }
- }
- }
- }
- return streamToTasks
- }
-}
class TaskInstance(
val task: Any,
@@ -97,35 +66,10 @@ class TaskInstance(
val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
val isClosableTask = task.isInstanceOf[ClosableTask]
val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
- val isControlMessageListener = task.isInstanceOf[ControlMessageListenerTask]
-
- val context = new TaskContext {
- var userContext: Object = null;
- def getMetricsRegistry = metrics.registry
- def getSystemStreamPartitions = systemStreamPartitions.asJava
- def getStore(storeName: String) = if (storageManager != null) {
- storageManager(storeName)
- } else {
- warn("No store found for name: %s" format storeName)
- null
- }
- def getTaskName = taskName
- def getSamzaContainerContext = containerContext
+ val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
+ storageManager, jobModel, streamMetadataCache)
- override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
- val startingOffsets = offsetManager.startingOffsets
- offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
- }
-
- override def setUserContext(context: Object): Unit = {
- userContext = context
- }
-
- override def getUserContext: Object = {
- userContext
- }
- }
// store the (ssp -> if this ssp is catched up) mapping. "catched up"
// means the same ssp in other taskInstances have the same offset as
// the one here.
@@ -133,10 +77,6 @@ class TaskInstance(
scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
- val inputToTasksMapping = TaskInstance.buildInputToTasks(jobModel)
- var endOfStreamManager: EndOfStreamManager = null
- var watermarkManager: WatermarkManager = null
-
val hasIntermediateStreams = config.getStreamIds.exists(config.getIsIntermediate(_))
def registerMetrics {
@@ -169,22 +109,6 @@ class TaskInstance(
} else {
debug("Skipping task initialization for taskName: %s" format taskName)
}
-
- if (isControlMessageListener) {
- endOfStreamManager = new EndOfStreamManager(taskName.getTaskName,
- task.asInstanceOf[ControlMessageListenerTask],
- inputToTasksMapping,
- systemStreamPartitions.asJava,
- streamMetadataCache,
- collector)
-
- watermarkManager = new WatermarkManager(taskName.getTaskName,
- task.asInstanceOf[ControlMessageListenerTask],
- inputToTasksMapping,
- systemStreamPartitions.asJava,
- streamMetadataCache,
- collector)
- }
}
def registerProducers {
@@ -223,51 +147,20 @@ class TaskInstance(
trace("Processing incoming message envelope for taskName and SSP: %s, %s"
format (taskName, envelope.getSystemStreamPartition))
- MessageType.of(envelope.getMessage) match {
- case MessageType.USER_MESSAGE =>
- if (isAsyncTask) {
- exceptionHandler.maybeHandle {
- val callback = callbackFactory.createCallback()
- task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
- }
- }
- else {
- exceptionHandler.maybeHandle {
- task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
- }
-
- trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
- format(taskName, envelope.getSystemStreamPartition, envelope.getOffset))
-
- offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
- }
+ if (isAsyncTask) {
+ exceptionHandler.maybeHandle {
+ val callback = callbackFactory.createCallback()
+ task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
+ }
+ } else {
+ exceptionHandler.maybeHandle {
+ task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
+ }
- case MessageType.END_OF_STREAM =>
- if (isControlMessageListener) {
- // handle eos synchronously.
- runSync(callbackFactory) {
- endOfStreamManager.update(envelope, coordinator)
- }
- } else {
- warn("Ignore end-of-stream message due to %s not implementing ControlMessageListener."
- format(task.getClass.toString))
- }
+ trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+ format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
- case MessageType.WATERMARK =>
- if (isControlMessageListener) {
- // handle watermark synchronously in the run loop thread.
- // we might consider running it asynchronously later
- runSync(callbackFactory) {
- val watermark = watermarkManager.update(envelope)
- if (watermark != null) {
- val stream = envelope.getSystemStreamPartition.getSystemStream
- task.asInstanceOf[ControlMessageListenerTask].onWatermark(watermark, stream, collector, coordinator)
- }
- }
- } else {
- warn("Ignore watermark message due to %s not implementing ControlMessageListener."
- format(task.getClass.toString))
- }
+ offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
}
}
}
@@ -343,38 +236,32 @@ class TaskInstance(
* it's already catched-up.
*/
private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
- systemAdmins match {
- case null => {
- warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
- }
- case others => {
- val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
- .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
- val system = envelope.getSystemStreamPartition.getSystem
- others(system).offsetComparator(envelope.getOffset, startingOffset) match {
- case null => {
- info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
- }
- case result => {
- if (result >= 0) {
- info(envelope.getSystemStreamPartition.toString + " is catched up.")
- ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
+ ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ } else {
+ systemAdmins match {
+ case null => {
+ warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
+ ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ }
+ case others => {
+ val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
+ .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
+ val system = envelope.getSystemStreamPartition.getSystem
+ others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+ case null => {
+ info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
+ ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
+ }
+ case result => {
+ if (result >= 0) {
+ info(envelope.getSystemStreamPartition.toString + " is catched up.")
+ ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+ }
}
}
}
}
}
}
-
- private def runSync(callbackFactory: TaskCallbackFactory)(runCodeBlock: => Unit) = {
- val callback = callbackFactory.createCallback()
- try {
- runCodeBlock
- callback.complete()
- } catch {
- case t: Throwable => callback.failure(t)
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 0100c78..76594ae 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -20,12 +20,13 @@
package org.apache.samza.serializers
import org.apache.samza.SamzaException
-import org.apache.samza.message.ControlMessage
-import org.apache.samza.message.WatermarkMessage
+import org.apache.samza.system.ControlMessage
import org.apache.samza.system.SystemStream
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.config.StorageConfig
+import org.apache.samza.system.WatermarkMessage
+
class SerdeManager(
serdes: Map[String, Serde[Object]] = Map(),
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java b/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
deleted file mode 100644
index 8351802..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.ControlMessage;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.task.MessageCollector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestControlMessageUtils {
-
- @Test
- public void testSendControlMessage() {
- SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
- Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
- partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
- StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
- when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-
- SystemStream systemStream = new SystemStream("test-system", "test-stream");
- Set<Integer> partitions = new HashSet<>();
- MessageCollector collector = mock(MessageCollector.class);
- doAnswer(invocation -> {
- OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
- partitions.add((Integer) envelope.getPartitionKey());
- assertEquals(envelope.getSystemStream(), systemStream);
- return null;
- }).when(collector).send(any());
-
- ControlMessageUtils.sendControlMessage(mock(ControlMessage.class), systemStream, metadataCache, collector);
- assertEquals(partitions.size(), 4);
- }
-
- @Test
- public void testCalculateUpstreamTaskCounts() {
- SystemStream input1 = new SystemStream("test-system", "input-stream-1");
- SystemStream input2 = new SystemStream("test-system", "input-stream-2");
- SystemStream input3 = new SystemStream("test-system", "input-stream-3");
-
- Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
- TaskName t0 = new TaskName("task 0"); //consume input1 and input2
- TaskName t1 = new TaskName("task 1"); //consume input1 and input2 and input 3
- TaskName t2 = new TaskName("task 2"); //consume input2 and input 3
- inputToTasks.put(input1, t0.getTaskName());
- inputToTasks.put(input1, t1.getTaskName());
- inputToTasks.put(input2, t0.getTaskName());
- inputToTasks.put(input2, t1.getTaskName());
- inputToTasks.put(input2, t2.getTaskName());
- inputToTasks.put(input3, t1.getTaskName());
- inputToTasks.put(input3, t2.getTaskName());
-
- StreamSpec inputSpec2 = new StreamSpec("input-stream-2", "input-stream-2", "test-system");
- StreamSpec inputSpec3 = new StreamSpec("input-stream-3", "input-stream-3", "test-system");
- StreamSpec intSpec1 = new StreamSpec("int-stream-1", "int-stream-1", "test-system");
- StreamSpec intSpec2 = new StreamSpec("int-stream-2", "int-stream-2", "test-system");
-
- List<IOGraph.IONode> nodes = new ArrayList<>();
- IOGraph.IONode node = new IOGraph.IONode(intSpec1, true);
- node.addInput(inputSpec2);
- nodes.add(node);
- node = new IOGraph.IONode(intSpec2, true);
- node.addInput(inputSpec3);
- nodes.add(node);
- IOGraph ioGraph = new IOGraph(nodes);
-
- Map<SystemStream, Integer> counts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, ioGraph);
- assertEquals(counts.get(intSpec1.toSystemStream()).intValue(), 3);
- assertEquals(counts.get(intSpec2.toSystemStream()).intValue(), 2);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
deleted file mode 100644
index cc70b6b..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestEndOfStreamManager {
- StreamMetadataCache metadataCache;
-
- @Before
- public void setup() {
- SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
- Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
- partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
- when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
- metadataCache = mock(StreamMetadataCache.class);
- when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
- }
-
- @Test
- public void testUpdateFromInputSource() {
- SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
- TaskName taskName = new TaskName("Task 0");
- Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
- streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
- ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
- when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
- EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null);
- manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp), mock(TaskCoordinator.class));
- assertTrue(manager.isEndOfStream(ssp.getSystemStream()));
- }
-
- @Test
- public void testUpdateFromIntermediateStream() {
- SystemStreamPartition[] ssps = new SystemStreamPartition[3];
- ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
- ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
- ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
- TaskName taskName = new TaskName("Task 0");
- Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
- for (SystemStreamPartition ssp : ssps) {
- streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
- }
- ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
- when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
- EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
-
- int envelopeCount = 4;
- IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount];
- for (int i = 0; i < envelopeCount; i++) {
- envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new EndOfStreamMessage("task " + i, envelopeCount));
- }
- TaskCoordinator coordinator = mock(TaskCoordinator.class);
-
- // verify the first three messages won't result in end-of-stream
- for (int i = 0; i < 3; i++) {
- manager.update(envelopes[i], coordinator);
- assertFalse(manager.isEndOfStream(ssps[0].getSystemStream()));
- }
- // the fourth message will end the stream
- manager.update(envelopes[3], coordinator);
- assertTrue(manager.isEndOfStream(ssps[0].getSystemStream()));
- assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-
- // stream2 has two partitions assigned to this task, so it requires a message from each partition to end it
- envelopes = new IncomingMessageEnvelope[envelopeCount];
- for (int i = 0; i < envelopeCount; i++) {
- envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
- }
- // verify the messages for the partition 0 won't result in end-of-stream
- for (int i = 0; i < 4; i++) {
- manager.update(envelopes[i], coordinator);
- assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
- }
- for (int i = 0; i < envelopeCount; i++) {
- envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
- }
- for (int i = 0; i < 3; i++) {
- manager.update(envelopes[i], coordinator);
- assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
- }
- // the fourth message will end the stream
- manager.update(envelopes[3], coordinator);
- assertTrue(manager.isEndOfStream(ssps[1].getSystemStream()));
- }
-
- @Test
- public void testUpdateFromIntermediateStreamWith2Tasks() {
- SystemStreamPartition[] ssps0 = new SystemStreamPartition[2];
- ssps0[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
- ssps0[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
-
- SystemStreamPartition ssp1 = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
- TaskName t0 = new TaskName("Task 0");
- Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
- for (SystemStreamPartition ssp : ssps0) {
- streamToTasks.put(ssp.getSystemStream(), t0.getTaskName());
- }
-
- TaskName t1 = new TaskName("Task 1");
- streamToTasks.put(ssp1, t1.getTaskName());
-
- List<StreamSpec> inputs = new ArrayList<>();
- inputs.add(new StreamSpec("test-stream-1", "test-stream-1", "test-system"));
- inputs.add(new StreamSpec("test-stream-2", "test-stream-2", "test-system"));
- StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
- IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
- ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
- when(listener.getIOGraph()).thenReturn(ioGraph);
-
- EndOfStreamManager manager0 = spy(new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null));
- manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[0]), mock(TaskCoordinator.class));
- assertTrue(manager0.isEndOfStream(ssps0[0].getSystemStream()));
- doNothing().when(manager0).sendEndOfStream(any(), anyInt());
- manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[1]), mock(TaskCoordinator.class));
- assertTrue(manager0.isEndOfStream(ssps0[1].getSystemStream()));
- verify(manager0).sendEndOfStream(any(), anyInt());
-
- EndOfStreamManager manager1 = spy(new EndOfStreamManager("Task 1", listener, streamToTasks, Collections.singleton(
- ssp1), null, null));
- doNothing().when(manager1).sendEndOfStream(any(), anyInt());
- manager1.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), mock(TaskCoordinator.class));
- assertTrue(manager1.isEndOfStream(ssp1.getSystemStream()));
- verify(manager1).sendEndOfStream(any(), anyInt());
- }
-
- @Test
- public void testSendEndOfStream() {
- StreamSpec ints = new StreamSpec("int-stream", "int-stream", "test-system");
- StreamSpec input = new StreamSpec("input-stream", "input-stream", "test-system");
- IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(Collections.singletonList(input), ints, true);
-
- Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
- for (int i = 0; i < 8; i++) {
- inputToTasks.put(input.toSystemStream(), "Task " + i);
- }
-
- MessageCollector collector = mock(MessageCollector.class);
- TaskName taskName = new TaskName("Task 0");
- ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
- when(listener.getIOGraph()).thenReturn(ioGraph);
- EndOfStreamManager manager = new EndOfStreamManager(taskName.getTaskName(),
- listener,
- inputToTasks,
- Collections.EMPTY_SET,
- metadataCache,
- collector);
-
- Set<Integer> partitions = new HashSet<>();
- doAnswer(invocation -> {
- OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
- partitions.add((Integer) envelope.getPartitionKey());
- EndOfStreamMessage eosMessage = (EndOfStreamMessage) envelope.getMessage();
- assertEquals(eosMessage.getTaskName(), taskName.getTaskName());
- assertEquals(eosMessage.getTaskCount(), 8);
- return null;
- }).when(collector).send(any());
-
- manager.sendEndOfStream(input.toSystemStream(), 8);
- assertEquals(partitions.size(), 4);
- }
-
- @Test
- public void testPropagate() {
- List<StreamSpec> inputs = new ArrayList<>();
- inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
- inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system"));
- StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-
- SystemStream input1 = new SystemStream("test-system", "input-stream-1");
- SystemStream input2 = new SystemStream("test-system", "input-stream-2");
- SystemStream ints = new SystemStream("test-system", "int-stream");
- SystemStreamPartition[] ssps = new SystemStreamPartition[3];
- ssps[0] = new SystemStreamPartition(input1, new Partition(0));
- ssps[1] = new SystemStreamPartition(input2, new Partition(0));
- ssps[2] = new SystemStreamPartition(ints, new Partition(0));
-
- Set<SystemStreamPartition> sspSet = new HashSet<>(Arrays.asList(ssps));
- TaskName taskName = new TaskName("task 0");
- Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
- for (SystemStreamPartition ssp : ssps) {
- streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
- }
-
- IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
- MessageCollector collector = mock(MessageCollector.class);
- ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
- when(listener.getIOGraph()).thenReturn(ioGraph);
- EndOfStreamManager manager = spy(
- new EndOfStreamManager("task 0", listener, streamToTasks, sspSet, metadataCache, collector));
- TaskCoordinator coordinator = mock(TaskCoordinator.class);
-
- // ssp1 end-of-stream, wait for ssp2
- manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[0]), coordinator);
- verify(manager, never()).sendEndOfStream(any(), anyInt());
-
- // ssp2 end-of-stream, propagate to intermediate
- manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[1]), coordinator);
- doNothing().when(manager).sendEndOfStream(any(), anyInt());
- ArgumentCaptor<SystemStream> argument = ArgumentCaptor.forClass(SystemStream.class);
- verify(manager).sendEndOfStream(argument.capture(), anyInt());
- assertEquals(ints, argument.getValue());
-
- // intermediate end-of-stream, shutdown the task
- manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[2]), coordinator);
- doNothing().when(coordinator).shutdown(any());
- ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
- verify(coordinator).shutdown(arg.capture());
- assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
- }
-
- // Test the case when the publishing tasks to intermediate stream is a subset of total tasks
- @Test
- public void testPropogateWith2Tasks() {
- StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
- OutputStreamImpl outputStream = new OutputStreamImpl(outputSpec, null, null);
- OutputOperatorSpec partitionByOp = OperatorSpecs.createPartitionByOperatorSpec(outputStream, 0);
-
- List<StreamSpec> inputs = new ArrayList<>();
- inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
-
- IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
- SystemStream input1 = new SystemStream("test-system", "input-stream-1");
- SystemStream ints = new SystemStream("test-system", "int-stream");
- SystemStreamPartition ssp1 = new SystemStreamPartition(input1, new Partition(0));
- SystemStreamPartition ssp2 = new SystemStreamPartition(ints, new Partition(0));
-
- TaskName t0 = new TaskName("task 0");
- TaskName t1 = new TaskName("task 1");
- Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
- streamToTasks.put(ssp1.getSystemStream(), t0.getTaskName());
- streamToTasks.put(ssp2.getSystemStream(), t1.getTaskName());
-
- ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
- when(listener.getIOGraph()).thenReturn(ioGraph);
-
- EndOfStreamManager manager0 = spy(
- new EndOfStreamManager(t0.getTaskName(), listener, streamToTasks, Collections.singleton(ssp1), metadataCache, null));
- EndOfStreamManager manager1 = spy(
- new EndOfStreamManager(t1.getTaskName(), listener, streamToTasks, Collections.singleton(ssp2), metadataCache, null));
-
- TaskCoordinator coordinator0 = mock(TaskCoordinator.class);
- TaskCoordinator coordinator1 = mock(TaskCoordinator.class);
-
- // ssp1 end-of-stream
- doNothing().when(manager0).sendEndOfStream(any(), anyInt());
- doNothing().when(coordinator0).shutdown(any());
- manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), coordinator0);
- //verify task count is 1
- ArgumentCaptor<Integer> argument = ArgumentCaptor.forClass(Integer.class);
- verify(manager0).sendEndOfStream(any(), argument.capture());
- assertTrue(argument.getValue() == 1);
- ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
- verify(coordinator0).shutdown(arg.capture());
- assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-
- // int1 end-of-stream
- IncomingMessageEnvelope intEos = new IncomingMessageEnvelope(ssp2, null, null, new EndOfStreamMessage(t0.getTaskName(), 1));
- manager1.update(intEos, coordinator1);
- doNothing().when(coordinator1).shutdown(any());
- verify(manager1, never()).sendEndOfStream(any(), anyInt());
- arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
- verify(coordinator1).shutdown(arg.capture());
- assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
deleted file mode 100644
index 39c56c3..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.control.IOGraph.IONode;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIOGraph {
- StreamSpec input1;
- StreamSpec input2;
- StreamSpec input3;
- StreamSpec output1;
- StreamSpec output2;
- StreamSpec int1;
- StreamSpec int2;
-
- StreamGraphImpl streamGraph;
-
- @Before
- public void setup() {
- ApplicationRunner runner = mock(ApplicationRunner.class);
- Map<String, String> configMap = new HashMap<>();
- configMap.put(JobConfig.JOB_NAME(), "test-app");
- configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
- Config config = new MapConfig(configMap);
-
- /**
- * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
- *
- * input1 -> map -> join -> output1
- * |
- * input2 -> partitionBy -> filter -|
- * |
- * input3 -> filter -> partitionBy -> map -> join -> output2
- *
- */
- input1 = new StreamSpec("input1", "input1", "system1");
- input2 = new StreamSpec("input2", "input2", "system2");
- input3 = new StreamSpec("input3", "input3", "system2");
-
- output1 = new StreamSpec("output1", "output1", "system1");
- output2 = new StreamSpec("output2", "output2", "system2");
-
- runner = mock(ApplicationRunner.class);
- when(runner.getStreamSpec("input1")).thenReturn(input1);
- when(runner.getStreamSpec("input2")).thenReturn(input2);
- when(runner.getStreamSpec("input3")).thenReturn(input3);
- when(runner.getStreamSpec("output1")).thenReturn(output1);
- when(runner.getStreamSpec("output2")).thenReturn(output2);
-
- // intermediate streams used in tests
- int1 = new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system");
- int2 = new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system");
- when(runner.getStreamSpec("test-app-1-partition_by-3"))
- .thenReturn(int1);
- when(runner.getStreamSpec("test-app-1-partition_by-8"))
- .thenReturn(int2);
-
- streamGraph = new StreamGraphImpl(runner, config);
- BiFunction msgBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
-
- m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(om1);
- m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
- }
-
- @Test
- public void testBuildIOGraph() {
- IOGraph ioGraph = streamGraph.toIOGraph();
- assertEquals(ioGraph.getNodes().size(), 4);
-
- for (IONode node : ioGraph.getNodes()) {
- if (node.getOutput().equals(output1)) {
- assertEquals(node.getInputs().size(), 2);
- assertFalse(node.isOutputIntermediate());
- StreamSpec[] inputs = sort(node.getInputs());
- assertEquals(inputs[0], input1);
- assertEquals(inputs[1], int1);
- } else if (node.getOutput().equals(output2)) {
- assertEquals(node.getInputs().size(), 2);
- assertFalse(node.isOutputIntermediate());
- StreamSpec[] inputs = sort(node.getInputs());
- assertEquals(inputs[0], int1);
- assertEquals(inputs[1], int2);
- } else if (node.getOutput().equals(int1)) {
- assertEquals(node.getInputs().size(), 1);
- assertTrue(node.isOutputIntermediate());
- StreamSpec[] inputs = sort(node.getInputs());
- assertEquals(inputs[0], input2);
- } else if (node.getOutput().equals(int2)) {
- assertEquals(node.getInputs().size(), 1);
- assertTrue(node.isOutputIntermediate());
- StreamSpec[] inputs = sort(node.getInputs());
- assertEquals(inputs[0], input3);
- }
- }
- }
-
- @Test
- public void testNodesOfInput() {
- IOGraph ioGraph = streamGraph.toIOGraph();
- Collection<IONode> nodes = ioGraph.getNodesOfInput(input1.toSystemStream());
- assertEquals(nodes.size(), 1);
- IONode node = nodes.iterator().next();
- assertEquals(node.getOutput(), output1);
- assertEquals(node.getInputs().size(), 2);
- assertFalse(node.isOutputIntermediate());
-
- nodes = ioGraph.getNodesOfInput(input2.toSystemStream());
- assertEquals(nodes.size(), 1);
- node = nodes.iterator().next();
- assertEquals(node.getOutput(), int1);
- assertEquals(node.getInputs().size(), 1);
- assertTrue(node.isOutputIntermediate());
-
- nodes = ioGraph.getNodesOfInput(int1.toSystemStream());
- assertEquals(nodes.size(), 2);
- nodes.forEach(n -> {
- assertEquals(n.getInputs().size(), 2);
- });
-
- nodes = ioGraph.getNodesOfInput(input3.toSystemStream());
- assertEquals(nodes.size(), 1);
- node = nodes.iterator().next();
- assertEquals(node.getOutput(), int2);
- assertEquals(node.getInputs().size(), 1);
- assertTrue(node.isOutputIntermediate());
-
- nodes = ioGraph.getNodesOfInput(int2.toSystemStream());
- assertEquals(nodes.size(), 1);
- node = nodes.iterator().next();
- assertEquals(node.getOutput(), output2);
- assertEquals(node.getInputs().size(), 2);
- assertFalse(node.isOutputIntermediate());
- }
-
- private static StreamSpec[] sort(Set<StreamSpec> specs) {
- StreamSpec[] array = new StreamSpec[specs.size()];
- specs.toArray(array);
- Arrays.sort(array, (s1, s2) -> s1.getId().compareTo(s2.getId()));
- return array;
- }
-
- public static IOGraph buildSimpleIOGraph(List<StreamSpec> inputs,
- StreamSpec output,
- boolean isOutputIntermediate) {
- IONode node = new IONode(output, isOutputIntermediate);
- inputs.forEach(input -> node.addInput(input));
- return new IOGraph(Collections.singleton(node));
- }
-}