You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/06/05 17:27:28 UTC
samza git commit: SAMZA-1271;
Guarantee predictable, deterministic order for operator
initialization and finalization
Repository: samza
Updated Branches:
refs/heads/master a4174309a -> ad8ba96e7
SAMZA-1271; Guarantee predictable, deterministic order for operator initialization and finalization
Currently, the order of initialization of operators in the Samza high level API is not deterministic. The non-determinism arises from two primary causes:
- No fixed order of iteration for all subscribed `OperatorSpec`s for a given `MessageStream`
- No fixed order of iteration for all the `OperatorImpl`s in the `OperatorImplGraph`
We aim to provide the following 2 guarantees in this patch:
For any 2 operators A, B in the graph, if B consumes the output of A:
- A is initialized before B is initialized
- A is finalized only after B is finalized
Author: vjagadish1989 <jv...@linkedin.com>
Reviewers: Prateek Maheshwari<pm...@linkedin.com>
Closes #211 from vjagadish1989/deterministic_order
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad8ba96e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad8ba96e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad8ba96e
Branch: refs/heads/master
Commit: ad8ba96e7ee2e98fe62fe0af07cb93e6153c5134
Parents: a417430
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Mon Jun 5 10:27:23 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Jun 5 10:27:23 2017 -0700
----------------------------------------------------------------------
.../operators/functions/ClosableFunction.java | 4 +
.../operators/functions/InitableFunction.java | 5 +
.../samza/operators/MessageStreamImpl.java | 6 +-
.../apache/samza/operators/StreamGraphImpl.java | 7 +-
.../samza/operators/impl/OperatorImplGraph.java | 19 ++--
.../samza/operators/spec/OperatorSpecs.java | 11 ++
.../apache/samza/task/StreamOperatorTask.java | 6 +-
.../samza/operators/TestStreamGraphImpl.java | 29 +++++
.../operators/impl/TestOperatorImplGraph.java | 111 +++++++++++++++++++
9 files changed, 180 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
index 2e73652..fe7137f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
@@ -26,6 +26,10 @@ import org.apache.samza.annotation.InterfaceStability;
*
* <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
*
+ * <p> Order of finalization: {@link ClosableFunction}s are invoked in the reverse topological order of operators in the
+ * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results
+ * from operator A, then operator B is guaranteed to be closed before operator A.
+ *
*/
@InterfaceStability.Unstable
public interface ClosableFunction {
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
index 4f9fad7..b08c6cd 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -25,6 +25,11 @@ import org.apache.samza.task.TaskContext;
/**
* A function that can be initialized before execution.
+ *
+ * <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
+ * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results
+ * from operator A, then operator A is guaranteed to be initialized before operator B.
+ *
*/
@InterfaceStability.Unstable
public interface InitableFunction {
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 0c84e90..9912f95 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -41,7 +41,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
@@ -61,8 +61,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
/**
* The set of operators that consume the messages in this {@link MessageStream}
+ *
+ * Use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
*/
- private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
+ private final Set<OperatorSpec> registeredOperatorSpecs = new LinkedHashSet<>();
/**
* Default constructor
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 1f1d282..fcce5eb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -31,8 +31,8 @@ import org.apache.samza.system.StreamSpec;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
@@ -51,8 +51,9 @@ public class StreamGraphImpl implements StreamGraph {
*/
private int opId = 0;
- private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>();
- private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>();
+ // Using LHM for deterministic order in initializing and closing operators.
+ private final Map<StreamSpec, InputStreamInternal> inStreams = new LinkedHashMap<>();
+ private final Map<StreamSpec, OutputStreamInternal> outStreams = new LinkedHashMap<>();
private final ApplicationRunner runner;
private final Config config;
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 78a6d1e..e99b3ee 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.impl;
+import com.google.common.collect.Lists;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
@@ -31,9 +32,12 @@ import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
@@ -47,8 +51,10 @@ public class OperatorImplGraph {
* A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
* multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
* input {@link MessageStreamImpl}s.
+ *
+ * Using LHM for deterministic ordering in initializing and closing operators.
*/
- private final Map<OperatorSpec, OperatorImpl> operatorImpls = new HashMap<>();
+ private final Map<OperatorSpec, OperatorImpl> operatorImpls = new LinkedHashMap<>();
/**
* A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
@@ -99,13 +105,10 @@ public class OperatorImplGraph {
return Collections.unmodifiableCollection(this.rootOperators.values());
}
- /**
- * Get all {@link OperatorImpl}s for the graph.
- *
- * @return an unmodifiable view of all {@link OperatorImpl}s for the graph
- */
- public Collection<OperatorImpl> getAllOperators() {
- return Collections.unmodifiableCollection(this.operatorImpls.values());
+ public void close() {
+ List<OperatorImpl> initializationOrder = new ArrayList<>(operatorImpls.values());
+ List<OperatorImpl> finalizationOrder = Lists.reverse(initializationOrder);
+ finalizationOrder.forEach(OperatorImpl::close);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 0b93bbe..66e2c58 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -71,6 +71,11 @@ public class OperatorSpecs {
public void init(Config config, TaskContext context) {
mapFn.init(config, context);
}
+
+ @Override
+ public void close() {
+ mapFn.close();
+ }
}, nextStream, OperatorSpec.OpCode.MAP, opId);
}
@@ -101,6 +106,12 @@ public class OperatorSpecs {
public void init(Config config, TaskContext context) {
filterFn.init(config, context);
}
+
+ @Override
+ public void close() {
+ filterFn.close();
+ }
+
}, nextStream, OperatorSpec.OpCode.FILTER, opId);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index a5f3f85..50ae775 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -22,7 +22,6 @@ import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.impl.OperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
import org.apache.samza.operators.impl.RootOperatorImpl;
import org.apache.samza.operators.stream.InputStreamInternal;
@@ -32,7 +31,6 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -142,8 +140,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
if (this.contextManager != null) {
this.contextManager.close();
}
-
- Collection<OperatorImpl> allOperators = operatorImplGraph.getAllOperators();
- allOperators.forEach(OperatorImpl::close);
+ operatorImplGraph.close();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 666bbb8..9d95217 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -18,11 +18,13 @@
*/
package org.apache.samza.operators;
+import junit.framework.Assert;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.data.MessageType;
import org.apache.samza.operators.data.TestInputMessageEnvelope;
import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.stream.InputStreamInternal;
import org.apache.samza.operators.stream.InputStreamInternalImpl;
import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
import org.apache.samza.operators.stream.OutputStreamInternalImpl;
@@ -30,6 +32,7 @@ import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -193,4 +196,30 @@ public class TestStreamGraphImpl {
assertEquals(graph.getNextOpId(), 1);
}
+ @Test
+ public void testGetInputStreamPreservesInsertionOrder() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
+
+ StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+
+ StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
+
+ graph.getInputStream("test-stream-1", (k, v) -> v);
+ graph.getInputStream("test-stream-2", (k, v) -> v);
+ graph.getInputStream("test-stream-3", (k, v) -> v);
+
+ ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
+ Assert.assertEquals(inputMessageStreams.size(), 3);
+ Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
+ Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
+ Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ad8ba96e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
new file mode 100644
index 0000000..67e5b46
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.SystemClock;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestOperatorImplGraph {
+
+ @Test
+ public void testOperatorGraphInitAndClose() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
+ StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = createMockContext();
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ List<String> initializationOrder = new ArrayList<>();
+ List<String> finalizationOrder = new ArrayList<>();
+
+ MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
+ MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
+
+ inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
+ .map(createMapFunction("2", initializationOrder, finalizationOrder));
+
+ inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
+ .map(createMapFunction("4", initializationOrder, finalizationOrder));
+
+ OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
+
+ // Assert that initialization occurs in topological order.
+ implGraph.init(graph, mockConfig, mockContext);
+ assertEquals(initializationOrder.get(0), "1");
+ assertEquals(initializationOrder.get(1), "2");
+ assertEquals(initializationOrder.get(2), "3");
+ assertEquals(initializationOrder.get(3), "4");
+
+ // Assert that finalization occurs in reverse topological order.
+ implGraph.close();
+ assertEquals(finalizationOrder.get(0), "4");
+ assertEquals(finalizationOrder.get(1), "3");
+ assertEquals(finalizationOrder.get(2), "2");
+ assertEquals(finalizationOrder.get(3), "1");
+ }
+
+ private TaskContext createMockContext() {
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ return mockContext;
+ }
+
+ /**
+ * Creates an identity map function that appends to the provided lists when init/close is invoked.
+ */
+ private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
+ return new MapFunction<Object, Object>() {
+ @Override
+ public void init(Config config, TaskContext context) {
+ initializationOrder.add(id);
+ }
+
+ @Override
+ public void close() {
+ finalizationOrder.add(id);
+ }
+
+ @Override
+ public Object apply(Object message) {
+ return message;
+ }
+ };
+ }
+}
+