You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 02:23:08 UTC
[1/2] samza git commit: Application Descriptor implementation cleanup.
Repository: samza
Updated Branches:
refs/heads/master 74675cea5 -> ef3599721
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index 64609b0..04ea7e3 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -19,12 +19,14 @@
package org.apache.samza.operators.spec;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.Scheduler;
@@ -34,7 +36,6 @@ import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
-import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -42,63 +43,18 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* Unit tests for partitionBy operator
*/
public class TestPartitionByOperatorSpec {
-
- private final Config mockConfig = mock(Config.class);
- private final GenericInputDescriptor testinputDescriptor =
- new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
- .getInputDescriptor("test-input-1", mock(Serde.class));
private final String testJobName = "testJob";
private final String testJobId = "1";
private final String testRepartitionedStreamName = "parByKey";
-
- class ScheduledMapFn implements MapFunction<Object, String>, ScheduledFunction<String, Object> {
-
- @Override
- public String apply(Object message) {
- return message.toString();
- }
-
- @Override
- public void schedule(Scheduler<String> scheduler) {
-
- }
-
- @Override
- public Collection<Object> onCallback(String key, long timestamp) {
- return null;
- }
- }
-
- class WatermarkMapFn implements MapFunction<Object, String>, WatermarkFunction<Object> {
-
- @Override
- public String apply(Object message) {
- return message.toString();
- }
-
- @Override
- public Collection<Object> processWatermark(long watermark) {
- return null;
- }
-
- @Override
- public Long getOutputWatermark() {
- return null;
- }
- }
-
- @Before
- public void setup() {
- when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn(testJobName);
- when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn(testJobId);
- }
+ private final GenericInputDescriptor testInputDescriptor =
+ new GenericSystemDescriptor("mockSystem", "mockFactoryClassName")
+ .getInputDescriptor("test-input-1", mock(Serde.class));
@Test
public void testPartitionBy() {
@@ -106,9 +62,9 @@ public class TestPartitionByOperatorSpec {
MapFunction<Object, Object> valueFn = m -> m;
KVSerde<Object, Object> partitionBySerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(keyFn, valueFn, partitionBySerde, testRepartitionedStreamName);
- }, mockConfig);
+ }, getConfig());
assertEquals(2, streamAppDesc.getInputOperators().size());
Map<String, InputOperatorSpec> inputOpSpecs = streamAppDesc.getInputOperators();
assertTrue(inputOpSpecs.keySet().contains(String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName)));
@@ -119,7 +75,7 @@ public class TestPartitionByOperatorSpec {
assertTrue(inputOpSpec.isKeyed());
assertNull(inputOpSpec.getScheduledFn());
assertNull(inputOpSpec.getWatermarkFn());
- InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId());
+ InputOperatorSpec originInputSpec = inputOpSpecs.get(testInputDescriptor.getStreamId());
assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
PartitionByOperatorSpec reparOpSpec = (PartitionByOperatorSpec) originInputSpec.getRegisteredOperatorSpecs().toArray()[0];
assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName));
@@ -135,9 +91,9 @@ public class TestPartitionByOperatorSpec {
MapFunction<Object, String> keyFn = m -> m.toString();
MapFunction<Object, Object> valueFn = m -> m;
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(keyFn, valueFn, mock(KVSerde.class), testRepartitionedStreamName);
- }, mockConfig);
+ }, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(
String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName));
assertNotNull(inputOpSpec);
@@ -146,7 +102,7 @@ public class TestPartitionByOperatorSpec {
assertTrue(inputOpSpec.isKeyed());
assertNull(inputOpSpec.getScheduledFn());
assertNull(inputOpSpec.getWatermarkFn());
- InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId());
+ InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testInputDescriptor.getStreamId());
assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
PartitionByOperatorSpec reparOpSpec = (PartitionByOperatorSpec) originInputSpec.getRegisteredOperatorSpecs().toArray()[0];
assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", testJobName, testJobId, testRepartitionedStreamName));
@@ -160,9 +116,9 @@ public class TestPartitionByOperatorSpec {
@Test
public void testCopy() {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(m -> m.toString(), m -> m, mock(KVSerde.class), testRepartitionedStreamName);
- }, mockConfig);
+ }, getConfig());
OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
OperatorSpecGraph clonedGraph = specGraph.clone();
OperatorSpecTestUtils.assertClonedGraph(specGraph, clonedGraph);
@@ -172,35 +128,77 @@ public class TestPartitionByOperatorSpec {
public void testScheduledFunctionAsKeyFn() {
ScheduledMapFn keyFn = new ScheduledMapFn();
new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream<Object> inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(keyFn, m -> m, mock(KVSerde.class), "parByKey");
- }, mockConfig);
+ }, getConfig());
}
@Test(expected = IllegalArgumentException.class)
public void testWatermarkFunctionAsKeyFn() {
WatermarkMapFn keyFn = new WatermarkMapFn();
new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream<Object> inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(keyFn, m -> m, mock(KVSerde.class), "parByKey");
- }, mockConfig);
+ }, getConfig());
}
@Test(expected = IllegalArgumentException.class)
public void testScheduledFunctionAsValueFn() {
ScheduledMapFn valueFn = new ScheduledMapFn();
new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream<Object> inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(m -> m.toString(), valueFn, mock(KVSerde.class), "parByKey");
- }, mockConfig);
+ }, getConfig());
}
@Test(expected = IllegalArgumentException.class)
public void testWatermarkFunctionAsValueFn() {
WatermarkMapFn valueFn = new WatermarkMapFn();
new StreamApplicationDescriptorImpl(appDesc -> {
- MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
+ MessageStream<Object> inputStream = appDesc.getInputStream(testInputDescriptor);
inputStream.partitionBy(m -> m.toString(), valueFn, mock(KVSerde.class), "parByKey");
- }, mockConfig);
+ }, getConfig());
+ }
+
+ private Config getConfig() {
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), testJobName);
+ configMap.put(JobConfig.JOB_ID(), testJobId);
+ return new MapConfig(configMap);
+ }
+
+ class ScheduledMapFn implements MapFunction<Object, String>, ScheduledFunction<String, Object> {
+ @Override
+ public String apply(Object message) {
+ return message.toString();
+ }
+
+ @Override
+ public void schedule(Scheduler<String> scheduler) {
+
+ }
+
+ @Override
+ public Collection<Object> onCallback(String key, long timestamp) {
+ return null;
+ }
+ }
+
+ class WatermarkMapFn implements MapFunction<Object, String>, WatermarkFunction<Object> {
+
+ @Override
+ public String apply(Object message) {
+ return message.toString();
+ }
+
+ @Override
+ public Collection<Object> processWatermark(long watermark) {
+ return null;
+ }
+
+ @Override
+ public Long getOutputWatermark() {
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index ba62691..8431f57 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,7 +19,6 @@
package org.apache.samza.test.controlmessages;
-import org.apache.samza.application.SamzaApplication;
import scala.collection.JavaConverters;
import java.lang.reflect.Field;
@@ -30,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
[2/2] samza git commit: Application Descriptor implementation cleanup.
Posted by pm...@apache.org.
Application Descriptor implementation cleanup.
Major changes:
1. Move withDefaultSystem to the base ApplicationDescriptor interface so that it can be used for TaskApplication as well.
2. Move some of the common state (input/output/system/table descriptors) and corresponding helper methods from the StreamApplicationDescriptorImpl/TaskApplicationDescriptorImpl to the ApplicationDescriptor class.
Author: Prateek Maheshwari <pm...@apache.org>
Reviewers: Cameron Lee <ca...@linkedin.com>, Yi Pan <ni...@gmail.com>
Closes #702 from prateekm/app-descriptor-cleanup
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef359972
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef359972
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef359972
Branch: refs/heads/master
Commit: ef359972161e6c12f9b4eb16d9c1a34446daa1cb
Parents: 74675ce
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Fri Oct 12 19:22:54 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Fri Oct 12 19:22:54 2018 -0700
----------------------------------------------------------------------
.../descriptors/ApplicationDescriptor.java | 12 +
.../StreamApplicationDescriptor.java | 12 -
.../descriptors/ApplicationDescriptorImpl.java | 178 ++++++++----
.../StreamApplicationDescriptorImpl.java | 185 ++-----------
.../TaskApplicationDescriptorImpl.java | 76 +-----
.../org/apache/samza/execution/JobGraph.java | 2 +-
.../samza/operators/OperatorSpecGraph.java | 8 +-
.../TestStreamApplicationDescriptorImpl.java | 116 ++++----
.../samza/execution/TestExecutionPlanner.java | 4 +-
.../operators/impl/TestOperatorImplGraph.java | 269 ++++++++++---------
.../spec/TestPartitionByOperatorSpec.java | 128 +++++----
.../WatermarkIntegrationTest.java | 2 +-
12 files changed, 441 insertions(+), 551 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
index b1e78b0..b2d54ca 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.system.descriptors.SystemDescriptor;
/**
@@ -47,6 +48,17 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
Config getConfig();
/**
+ * Sets the default SystemDescriptor to use for the application. This is equivalent to setting
+ * {@code job.default.system} and its properties in configuration.
+ * <p>
+ * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
+ *
+ * @param defaultSystemDescriptor the default system descriptor to use
+ * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
+ */
+ S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
+
+ /**
* Sets the {@link ApplicationContainerContextFactory} for this application. Each task will be given access to a
* different instance of the {@link org.apache.samza.context.ApplicationContainerContext} that this creates. The
* context can be accessed through the {@link org.apache.samza.context.Context}.
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
index 383e9ce..3a35054 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
@@ -25,7 +25,6 @@ import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.table.Table;
@@ -36,17 +35,6 @@ import org.apache.samza.table.Table;
public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
/**
- * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting
- * {@code job.default.system} and its properties in configuration.
- * <p>
- * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
- *
- * @param defaultSystemDescriptor the default system descriptor to use
- * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
- */
- StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
-
- /**
* Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}.
* <p>
* A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
index f3c34a9..743f9dc 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -18,13 +18,19 @@
*/
package org.apache.samza.application.descriptors;
+import com.google.common.base.Preconditions;
+
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.samza.application.SamzaApplication;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.context.ApplicationContainerContext;
import org.apache.samza.context.ApplicationContainerContextFactory;
@@ -34,6 +40,7 @@ import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -56,17 +63,25 @@ import org.slf4j.LoggerFactory;
* @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either
* {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor}
*/
-public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
- implements ApplicationDescriptor<S> {
+public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> implements ApplicationDescriptor<S> {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
+ static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
private final Class<? extends SamzaApplication> appClass;
+ private final Config config;
+
+ // We use a LHMs for deterministic order in initializing and closing operators.
+ private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+ private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+ private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+ private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+ private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
+
private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
// serdes used by input/output/intermediate streams, keyed by streamId
private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
// serdes used by tables, keyed by tableId
private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
- final Config config;
private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty();
private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty();
@@ -80,30 +95,41 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
}
@Override
- public Config getConfig() {
+ public final Config getConfig() {
return config;
}
@Override
- public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
+ public final S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+ Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
+ Preconditions.checkState(getInputStreamIds().isEmpty() && getOutputStreamIds().isEmpty(),
+ "Default system must be set before creating any input or output streams.");
+ addSystemDescriptor(defaultSystemDescriptor);
+
+ defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
+ return (S) this;
+ }
+
+ @Override
+ public final S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
this.applicationContainerContextFactoryOptional = Optional.of(factory);
return (S) this;
}
@Override
- public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
+ public final S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
this.applicationTaskContextFactoryOptional = Optional.of(factory);
return (S) this;
}
@Override
- public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
+ public final S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
this.listenerFactory = listenerFactory;
return (S) this;
}
@Override
- public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
+ public final S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
this.reporterFactories.clear();
this.reporterFactories.putAll(reporterFactories);
return (S) this;
@@ -161,93 +187,106 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
}
/**
- * Get the default {@link SystemDescriptor} in this application
+ * Get all the unique input streamIds in this application, including any intermediate streams.
*
- * @return the default {@link SystemDescriptor}
+ * @return an immutable set of input streamIds
*/
- public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
- // default is not set
- return Optional.empty();
+ public Set<String> getInputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
}
/**
- * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+ * Get all the unique output streamIds in this application, including any intermediate streams.
*
- * @param streamId id of the stream
- * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+ * @return an immutable set of output streamIds
*/
- public KV<Serde, Serde> getStreamSerdes(String streamId) {
- return streamSerdes.get(streamId);
+ public Set<String> getOutputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
}
/**
- * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+ * Get all the intermediate broadcast streamIds for this application
*
- * @param tableId id of the table
- * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+ * @return an immutable set of streamIds
*/
- public KV<Serde, Serde> getTableSerdes(String tableId) {
- return tableSerdes.get(tableId);
+ public Set<String> getIntermediateBroadcastStreamIds() {
+ return Collections.emptySet();
}
/**
- * Get the map of all {@link InputOperatorSpec}s in this applicaiton
+ * Get all the {@link InputDescriptor}s to this application
*
- * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level
- * {@link org.apache.samza.application.TaskApplication}
+ * @return an immutable map of streamId to {@link InputDescriptor}
*/
- public Map<String, InputOperatorSpec> getInputOperators() {
- return Collections.EMPTY_MAP;
+ public Map<String, InputDescriptor> getInputDescriptors() {
+ return Collections.unmodifiableMap(inputDescriptors);
}
/**
- * Get all the {@link InputDescriptor}s to this application
+ * Get all the {@link OutputDescriptor}s for this application
*
- * @return an immutable map of streamId to {@link InputDescriptor}
+ * @return an immutable map of streamId to {@link OutputDescriptor}
*/
- public abstract Map<String, InputDescriptor> getInputDescriptors();
+ public Map<String, OutputDescriptor> getOutputDescriptors() {
+ return Collections.unmodifiableMap(outputDescriptors);
+ }
/**
- * Get all the {@link OutputDescriptor}s from this application
+ * Get all the {@link SystemDescriptor}s in this application
*
- * @return an immutable map of streamId to {@link OutputDescriptor}
+ * @return an immutable set of {@link SystemDescriptor}s
*/
- public abstract Map<String, OutputDescriptor> getOutputDescriptors();
+ public Set<SystemDescriptor> getSystemDescriptors() {
+ return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+ }
/**
- * Get all the broadcast streamIds from this application
+ * Get the default {@link SystemDescriptor} in this application
*
- * @return an immutable set of streamIds
+ * @return the default {@link SystemDescriptor}
*/
- public abstract Set<String> getBroadcastStreams();
+ public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+ return defaultSystemDescriptorOptional;
+ }
/**
* Get all the {@link TableDescriptor}s in this application
*
* @return an immutable set of {@link TableDescriptor}s
*/
- public abstract Set<TableDescriptor> getTableDescriptors();
+ public Set<TableDescriptor> getTableDescriptors() {
+ return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+ }
/**
- * Get all the unique {@link SystemDescriptor}s in this application
+ * Get a map of all {@link InputOperatorSpec}s in this application
*
- * @return an immutable set of {@link SystemDescriptor}s
+ * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level
+ * {@link org.apache.samza.application.TaskApplication}
*/
- public abstract Set<SystemDescriptor> getSystemDescriptors();
+ public Map<String, InputOperatorSpec> getInputOperators() {
+ return Collections.emptyMap();
+ }
/**
- * Get all the unique input streamIds in this application
+ * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
*
- * @return an immutable set of input streamIds
+ * @param streamId id of the stream
+ * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
*/
- public abstract Set<String> getInputStreamIds();
+ public KV<Serde, Serde> getStreamSerdes(String streamId) {
+ return streamSerdes.get(streamId);
+ }
/**
- * Get all the unique output streamIds in this application
+ * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
*
- * @return an immutable set of output streamIds
+ * @param tableId id of the table
+ * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
*/
- public abstract Set<String> getOutputStreamIds();
+ public KV<Serde, Serde> getTableSerdes(String tableId) {
+ return tableSerdes.get(tableId);
+ }
KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
Serde keySerde, valueSerde;
@@ -273,7 +312,7 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
}
streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
} else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
- throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+ throw new IllegalArgumentException(String.format("Serde for streamId: %s is already defined. Cannot change it to "
+ "different serdes.", streamId));
}
return streamSerdes.get(streamId);
@@ -297,4 +336,47 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
return streamSerdes.get(tableId);
}
+ final void addInputDescriptor(InputDescriptor inputDescriptor) {
+ String streamId = inputDescriptor.getStreamId();
+ Preconditions.checkState(!inputDescriptors.containsKey(streamId)
+ || inputDescriptors.get(streamId) == inputDescriptor,
+ String.format("Cannot add multiple input descriptors with the same streamId: %s", streamId));
+ inputDescriptors.put(streamId, inputDescriptor);
+ addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+ }
+
+ final void addOutputDescriptor(OutputDescriptor outputDescriptor) {
+ String streamId = outputDescriptor.getStreamId();
+ Preconditions.checkState(!outputDescriptors.containsKey(streamId)
+ || outputDescriptors.get(streamId) == outputDescriptor,
+ String.format("Cannot add an output descriptor multiple times with the same streamId: %s", streamId));
+ outputDescriptors.put(streamId, outputDescriptor);
+ addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+ }
+
+ final void addTableDescriptor(TableDescriptor tableDescriptor) {
+ String tableId = tableDescriptor.getTableId();
+ Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
+ String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
+ Preconditions.checkState(!tableDescriptors.containsKey(tableId)
+ || tableDescriptors.get(tableId) == tableDescriptor,
+ String.format("Cannot add multiple table descriptors with the same tableId: %s", tableId));
+
+ if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+ List<? extends TableDescriptor> tableDescs =
+ ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+ tableDescs.forEach(td -> addTableDescriptor(td));
+ }
+
+ tableDescriptors.put(tableId, tableDescriptor);
+ }
+
+ // check uniqueness of the {@code systemDescriptor} and add if it is unique
+ private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+ String systemName = systemDescriptor.getSystemName();
+ Preconditions.checkState(!systemDescriptors.containsKey(systemName)
+ || systemDescriptors.get(systemName) == systemDescriptor,
+ "Must not use different system descriptor instances for the same system name: " + systemName);
+ systemDescriptors.put(systemName, systemDescriptor);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
index ec36cf3..e57c957 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -20,14 +20,13 @@ package org.apache.samza.application.descriptors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
@@ -54,10 +53,6 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class defines:
@@ -67,22 +62,13 @@ import org.slf4j.LoggerFactory;
*/
public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
implements StreamApplicationDescriptor {
- private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
- private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
- private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
- private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
- private final Set<String> broadcastStreams = new HashSet<>();
- private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
- private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
- // We use a LHM for deterministic order in initializing and closing operators.
+ // We use a LHMs for deterministic order in initializing and closing operators.
+ private final Set<String> intermediateBroadcastStreamIds = new HashSet<>();
private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
- private final Map<String, TableImpl> tables = new LinkedHashMap<>();
private final Set<String> operatorIds = new HashSet<>();
- private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
-
/**
* The 0-based position of the next operator in the graph.
* Part of the unique ID for each OperatorSpec in the graph.
@@ -96,17 +82,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
}
@Override
- public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
- Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
- Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
- "Default system must be set before creating any input or output streams.");
- addSystemDescriptor(defaultSystemDescriptor);
-
- defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
- return this;
- }
-
- @Override
public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
Optional<StreamExpander> expander = systemDescriptor.getExpander();
@@ -115,26 +90,11 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
}
// TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
- Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
- String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
- inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
- addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+ addInputDescriptor(inputDescriptor);
String streamId = inputDescriptor.getStreamId();
- Preconditions.checkState(!inputOperators.containsKey(streamId),
- "getInputStream must not be called multiple times with the same streamId: " + streamId);
-
Serde serde = inputDescriptor.getSerde();
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
- if (outputStreams.containsKey(streamId)) {
- OutputStreamImpl outputStream = outputStreams.get(streamId);
- Serde keySerde = outputStream.getKeySerde();
- Serde valueSerde = outputStream.getValueSerde();
- Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
- String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
- + "stream level, so the same key and message Serde must be used for both.", streamId));
- }
-
boolean isKeyed = serde instanceof KVSerde;
InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
InputOperatorSpec inputOperatorSpec =
@@ -146,26 +106,11 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
@Override
public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
- Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
- String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
- outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
- addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+ addOutputDescriptor(outputDescriptor);
String streamId = outputDescriptor.getStreamId();
- Preconditions.checkState(!outputStreams.containsKey(streamId),
- "getOutputStream must not be called multiple times with the same streamId: " + streamId);
-
Serde serde = outputDescriptor.getSerde();
KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
- if (inputOperators.containsKey(streamId)) {
- InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
- Serde keySerde = inputOperatorSpec.getKeySerde();
- Serde valueSerde = inputOperatorSpec.getValueSerde();
- Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
- String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
- + "stream level, so the same key and message Serde must be used for both.", streamId));
- }
-
boolean isKeyed = serde instanceof KVSerde;
outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return outputStreams.get(streamId);
@@ -173,80 +118,10 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
@Override
public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
-
- if (tableDescriptor instanceof BaseHybridTableDescriptor) {
- List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
- tableDescs.forEach(td -> getTable(td));
- }
-
- String tableId = tableDescriptor.getTableId();
- Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
- String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
- Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
- String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
- tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
-
+ addTableDescriptor(tableDescriptor);
BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
- TableSpec tableSpec = baseTableDescriptor.getTableSpec();
- if (tables.containsKey(tableSpec.getId())) {
- throw new IllegalStateException(
- String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
- }
- tables.put(tableSpec.getId(), new TableImpl(tableSpec));
- getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
- return tables.get(tableSpec.getId());
- }
-
- /**
- * Get all the {@link InputDescriptor}s to this application
- *
- * @return an immutable map of streamId to {@link InputDescriptor}
- */
- @Override
- public Map<String, InputDescriptor> getInputDescriptors() {
- return Collections.unmodifiableMap(inputDescriptors);
- }
-
- /**
- * Get all the {@link OutputDescriptor}s from this application
- *
- * @return an immutable map of streamId to {@link OutputDescriptor}
- */
- @Override
- public Map<String, OutputDescriptor> getOutputDescriptors() {
- return Collections.unmodifiableMap(outputDescriptors);
- }
-
- /**
- * Get all the broadcast streamIds from this application
- *
- * @return an immutable set of streamIds
- */
- @Override
- public Set<String> getBroadcastStreams() {
- return Collections.unmodifiableSet(broadcastStreams);
- }
-
- /**
- * Get all the {@link TableDescriptor}s in this application
- *
- * @return an immutable set of {@link TableDescriptor}s
- */
- @Override
- public Set<TableDescriptor> getTableDescriptors() {
- return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
- }
-
- /**
- * Get all the unique {@link SystemDescriptor}s in this application
- *
- * @return an immutable set of {@link SystemDescriptor}s
- */
- @Override
- public Set<SystemDescriptor> getSystemDescriptors() {
- // We enforce that users must not use different system descriptor instances for the same system name
- // when getting an input/output stream or setting the default system descriptor
- return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+ getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
+ return new TableImpl(baseTableDescriptor.getTableSpec());
}
@Override
@@ -259,14 +134,17 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
}
- /**
- * Get the default {@link SystemDescriptor} in this application
- *
- * @return the default {@link SystemDescriptor}
- */
@Override
- public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
- return defaultSystemDescriptorOptional;
+ public Set<String> getIntermediateBroadcastStreamIds() {
+ return Collections.unmodifiableSet(intermediateBroadcastStreamIds);
+ }
+
+ public Map<String, InputOperatorSpec> getInputOperators() {
+ return Collections.unmodifiableMap(inputOperators);
+ }
+
+ public Map<String, OutputStreamImpl> getOutputStreams() {
+ return Collections.unmodifiableMap(outputStreams);
}
public OperatorSpecGraph getOperatorSpecGraph() {
@@ -286,9 +164,10 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
}
+ JobConfig jobConfig = new JobConfig(getConfig());
String nextOpId = String.format("%s-%s-%s-%s",
- config.get(JobConfig.JOB_NAME()),
- config.get(JobConfig.JOB_ID(), "1"),
+ jobConfig.getName().get(),
+ jobConfig.getJobId(),
opCode.name().toLowerCase(),
StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
if (!operatorIds.add(nextOpId)) {
@@ -310,18 +189,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
return getNextOpId(opCode, null);
}
- public Map<String, InputOperatorSpec> getInputOperators() {
- return Collections.unmodifiableMap(inputOperators);
- }
-
- public Map<String, OutputStreamImpl> getOutputStreams() {
- return Collections.unmodifiableMap(outputStreams);
- }
-
- public Map<String, TableImpl> getTables() {
- return Collections.unmodifiableMap(tables);
- }
-
/**
* Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
* An intermediate {@link MessageStream} is both an output and an input stream.
@@ -340,7 +207,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
"getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
if (isBroadcast) {
- broadcastStreams.add(streamId);
+ intermediateBroadcastStreamIds.add(streamId);
}
boolean isKeyed = serde instanceof KVSerde;
@@ -356,12 +223,4 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
}
-
- // check uniqueness of the {@code systemDescriptor} and add if it is unique
- private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
- Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
- || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
- "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
- systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
index cb924ab..c62a455 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
@@ -18,19 +18,12 @@
*/
package org.apache.samza.application.descriptors;
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.Config;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.task.TaskFactory;
@@ -43,13 +36,6 @@ import org.apache.samza.task.TaskFactory;
*/
public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
implements TaskApplicationDescriptor {
-
- private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
- private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
- private final Set<String> broadcastStreams = new HashSet<>();
- private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
- private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-
private TaskFactory taskFactory = null;
public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
@@ -65,65 +51,21 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
@Override
public void addInputStream(InputDescriptor inputDescriptor) {
// TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
- Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
- String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+ addInputDescriptor(inputDescriptor);
getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
- inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
- addSystemDescriptor(inputDescriptor.getSystemDescriptor());
}
@Override
public void addOutputStream(OutputDescriptor outputDescriptor) {
- Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
- String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+ addOutputDescriptor(outputDescriptor);
getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
- outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
- addSystemDescriptor(outputDescriptor.getSystemDescriptor());
}
@Override
public void addTable(TableDescriptor tableDescriptor) {
- Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
- String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
- getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
- tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
- }
-
- @Override
- public Map<String, InputDescriptor> getInputDescriptors() {
- return Collections.unmodifiableMap(inputDescriptors);
- }
-
- @Override
- public Map<String, OutputDescriptor> getOutputDescriptors() {
- return Collections.unmodifiableMap(outputDescriptors);
- }
-
- @Override
- public Set<String> getBroadcastStreams() {
- return Collections.unmodifiableSet(broadcastStreams);
- }
-
- @Override
- public Set<TableDescriptor> getTableDescriptors() {
- return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
- }
-
- @Override
- public Set<SystemDescriptor> getSystemDescriptors() {
- // We enforce that users must not use different system descriptor instances for the same system name
- // when getting an input/output stream or setting the default system descriptor
- return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
- }
-
- @Override
- public Set<String> getInputStreamIds() {
- return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
- }
-
- @Override
- public Set<String> getOutputStreamIds() {
- return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
+ addTableDescriptor(tableDescriptor);
+ BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
+ getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
}
/**
@@ -133,12 +75,4 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
public TaskFactory getTaskFactory() {
return taskFactory;
}
-
- // check uniqueness of the {@code systemDescriptor} and add if it is unique
- private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
- Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
- || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
- "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
- systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 6d9faf3..7944dd3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -273,7 +273,7 @@ import org.slf4j.LoggerFactory;
String streamId = streamSpec.getId();
StreamEdge edge = edges.get(streamId);
if (edge == null) {
- boolean isBroadcast = appDesc.getBroadcastStreams().contains(streamId);
+ boolean isBroadcast = appDesc.getIntermediateBroadcastStreamIds().contains(streamId);
edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
edges.put(streamId, edge);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index a83739d..a34d603 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -43,7 +43,7 @@ public class OperatorSpecGraph implements Serializable {
// We use a LHM for deterministic order in initializing and closing operators.
private final Map<String, InputOperatorSpec> inputOperators;
private final Map<String, OutputStreamImpl> outputStreams;
- private final Set<String> broadcastStreams;
+ private final Set<String> intermediateBroadcastStreamIds;
private final Set<OperatorSpec> allOpSpecs;
private final boolean hasWindowOrJoins;
@@ -54,7 +54,7 @@ public class OperatorSpecGraph implements Serializable {
public OperatorSpecGraph(StreamApplicationDescriptorImpl streamAppDesc) {
this.inputOperators = streamAppDesc.getInputOperators();
this.outputStreams = streamAppDesc.getOutputStreams();
- this.broadcastStreams = streamAppDesc.getBroadcastStreams();
+ this.intermediateBroadcastStreamIds = streamAppDesc.getIntermediateBroadcastStreamIds();
this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
this.hasWindowOrJoins = checkWindowOrJoins();
this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
@@ -68,8 +68,8 @@ public class OperatorSpecGraph implements Serializable {
return outputStreams;
}
- public Set<String> getBroadcastStreams() {
- return broadcastStreams;
+ public Set<String> getIntermediateBroadcastStreamIds() {
+ return intermediateBroadcastStreamIds;
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
index d889486..3b680fc 100644
--- a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
@@ -19,24 +19,27 @@
package org.apache.samza.application.descriptors;
import com.google.common.collect.ImmutableList;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.system.descriptors.ExpandingInputDescriptorProvider;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.system.descriptors.ExpandingInputDescriptorProvider;
+import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.SystemDescriptor;
import org.apache.samza.system.descriptors.TransformingInputDescriptorProvider;
import org.apache.samza.system.descriptors.InputTransformer;
@@ -52,15 +55,13 @@ import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -70,11 +71,10 @@ import static org.mockito.Mockito.when;
* Unit test for {@link StreamApplicationDescriptorImpl}
*/
public class TestStreamApplicationDescriptorImpl {
-
@Test
public void testConstructor() {
StreamApplication mockApp = mock(StreamApplication.class);
- Config mockConfig = mock(Config.class);
+ Config mockConfig = getConfig();
StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
verify(mockApp).describe(appDesc);
assertEquals(mockConfig, appDesc.getConfig());
@@ -89,7 +89,7 @@ public class TestStreamApplicationDescriptorImpl {
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
- }, mock(Config.class));
+ }, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -112,7 +112,7 @@ public class TestStreamApplicationDescriptorImpl {
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
- }, mock(Config.class));
+ }, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -128,7 +128,7 @@ public class TestStreamApplicationDescriptorImpl {
GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
- }, mock(Config.class));
+ }, getConfig());
}
@Test
@@ -140,7 +140,7 @@ public class TestStreamApplicationDescriptorImpl {
MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
- }, mock(Config.class));
+ }, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -166,7 +166,7 @@ public class TestStreamApplicationDescriptorImpl {
MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
- }, mock(Config.class));
+ }, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -184,7 +184,7 @@ public class TestStreamApplicationDescriptorImpl {
GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
- }, mock(Config.class));
+ }, getConfig());
InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -203,7 +203,7 @@ public class TestStreamApplicationDescriptorImpl {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd1);
appDesc.getInputStream(isd2);
- }, mock(Config.class));
+ }, getConfig());
InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
@@ -226,7 +226,7 @@ public class TestStreamApplicationDescriptorImpl {
appDesc.getInputStream(isd1);
// should throw exception
appDesc.getInputStream(isd2);
- }, mock(Config.class));
+ }, getConfig());
}
@Test
@@ -248,7 +248,7 @@ public class TestStreamApplicationDescriptorImpl {
appDesc.getOutputStream(osd1);
fail("adding output stream with the same system name but different SystemDescriptor should have failed");
} catch (IllegalStateException e) { }
- }, mock(Config.class));
+ }, getConfig());
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.withDefaultSystem(sd2);
@@ -256,7 +256,7 @@ public class TestStreamApplicationDescriptorImpl {
appDesc.getInputStream(isd1);
fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
} catch (IllegalStateException e) { }
- }, mock(Config.class));
+ }, getConfig());
}
@Test
@@ -272,7 +272,7 @@ public class TestStreamApplicationDescriptorImpl {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
- }, mock(Config.class));
+ }, getConfig());
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
assertEquals(streamId, outputStreamImpl.getStreamId());
@@ -288,7 +288,7 @@ public class TestStreamApplicationDescriptorImpl {
GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
- }, mock(Config.class));
+ }, getConfig());
}
@Test
@@ -300,7 +300,7 @@ public class TestStreamApplicationDescriptorImpl {
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
- }, mock(Config.class));
+ }, getConfig());
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
assertEquals(streamId, outputStreamImpl.getStreamId());
@@ -318,7 +318,7 @@ public class TestStreamApplicationDescriptorImpl {
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getInputStream(isd);
appDesc.withDefaultSystem(sd); // should throw exception
- }, mock(Config.class));
+ }, getConfig());
}
@Test(expected = IllegalStateException.class)
@@ -329,13 +329,13 @@ public class TestStreamApplicationDescriptorImpl {
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd);
appDesc.withDefaultSystem(sd); // should throw exception
- }, mock(Config.class));
+ }, getConfig());
}
@Test(expected = IllegalStateException.class)
public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
String streamId = "test-stream-1";
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
}
@@ -349,13 +349,13 @@ public class TestStreamApplicationDescriptorImpl {
new StreamApplicationDescriptorImpl(appDesc -> {
appDesc.getOutputStream(osd1);
appDesc.getOutputStream(osd2); // should throw exception
- }, mock(Config.class));
+ }, getConfig());
}
@Test
public void testGetIntermediateStreamWithValueSerde() {
String streamId = "stream-1";
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
Serde mockValueSerde = mock(Serde.class);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
@@ -373,7 +373,7 @@ public class TestStreamApplicationDescriptorImpl {
@Test
public void testGetIntermediateStreamWithKeyValueSerde() {
String streamId = "streamId";
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
@@ -394,7 +394,7 @@ public class TestStreamApplicationDescriptorImpl {
@Test
public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = getConfig();
String streamId = "streamId";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
@@ -410,7 +410,7 @@ public class TestStreamApplicationDescriptorImpl {
@Test(expected = NullPointerException.class)
public void testGetIntermediateStreamWithNoSerde() {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = getConfig();
String streamId = "streamId";
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
@@ -420,7 +420,7 @@ public class TestStreamApplicationDescriptorImpl {
@Test(expected = IllegalStateException.class)
public void testGetSameIntermediateStreamTwice() {
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
// should throw exception
streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
@@ -428,11 +428,12 @@ public class TestStreamApplicationDescriptorImpl {
@Test
public void testGetNextOpIdIncrementsId() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "jobName");
+ configMap.put(JobConfig.JOB_ID(), "1234");
+ Config config = new MapConfig(configMap);
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
assertEquals("jobName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
@@ -440,22 +441,24 @@ public class TestStreamApplicationDescriptorImpl {
@Test(expected = SamzaException.class)
public void testGetNextOpIdRejectsDuplicates() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "jobName");
+ configMap.put(JobConfig.JOB_ID(), "1234");
+ Config config = new MapConfig(configMap);
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
}
@Test
public void testOpIdValidation() {
- Config mockConfig = mock(Config.class);
- when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
- when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "jobName");
+ configMap.put(JobConfig.JOB_ID(), "1234");
+ Config config = new MapConfig(configMap);
- StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
// null and empty userDefinedIDs should fall back to autogenerated IDs.
try {
@@ -487,7 +490,7 @@ public class TestStreamApplicationDescriptorImpl {
@Test
public void testGetInputStreamPreservesInsertionOrder() {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = getConfig();
String testStreamId1 = "test-stream-1";
String testStreamId2 = "test-stream-2";
@@ -509,24 +512,25 @@ public class TestStreamApplicationDescriptorImpl {
@Test
public void testGetTable() throws Exception {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = getConfig();
BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
+ AtomicReference<TableImpl> table = new AtomicReference<>();
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
- appDesc.getTable(mockTableDescriptor);
+ table.set((TableImpl) appDesc.getTable(mockTableDescriptor));
}, mockConfig);
- assertNotNull(streamAppDesc.getTables().get(testTableSpec.getId()));
+ assertEquals(testTableSpec.getId(), table.get().getTableSpec().getId());
}
@Test
public void testApplicationContainerContextFactory() {
ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
}
@@ -534,7 +538,7 @@ public class TestStreamApplicationDescriptorImpl {
public void testNoApplicationContainerContextFactory() {
StreamApplication testApp = appDesc -> {
};
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
}
@@ -542,7 +546,7 @@ public class TestStreamApplicationDescriptorImpl {
public void testApplicationTaskContextFactory() {
ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
}
@@ -550,7 +554,7 @@ public class TestStreamApplicationDescriptorImpl {
public void testNoApplicationTaskContextFactory() {
StreamApplication testApp = appDesc -> {
};
- StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
}
@@ -558,13 +562,13 @@ public class TestStreamApplicationDescriptorImpl {
public void testProcessorLifecycleListenerFactory() {
ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
- StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, getConfig());
assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
}
@Test(expected = IllegalStateException.class)
public void testGetTableWithBadId() {
- Config mockConfig = mock(Config.class);
+ Config mockConfig = getConfig();
new StreamApplicationDescriptorImpl(appDesc -> {
BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
when(mockTableDescriptor.getTableId()).thenReturn("my.table");
@@ -572,6 +576,12 @@ public class TestStreamApplicationDescriptorImpl {
}, mockConfig);
}
+ private Config getConfig() {
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "test-job");
+ return new MapConfig(configMap);
+ }
+
class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
super(systemName, "factory.class", null, expander);
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 6208206..f49958c 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -765,7 +765,7 @@ public class TestExecutionPlanner {
when(taskAppDesc.getOutputStreamIds()).thenReturn(outputDescriptors.keySet());
when(taskAppDesc.getTableDescriptors()).thenReturn(Collections.emptySet());
when(taskAppDesc.getSystemDescriptors()).thenReturn(systemDescriptors);
- when(taskAppDesc.getBroadcastStreams()).thenReturn(broadcastStreams);
+ when(taskAppDesc.getIntermediateBroadcastStreamIds()).thenReturn(broadcastStreams);
doReturn(MockTaskApplication.class).when(taskAppDesc).getAppClass();
Map<String, String> systemStreamConfigs = new HashMap<>();
@@ -796,7 +796,7 @@ public class TestExecutionPlanner {
when(taskAppDesc.getOutputDescriptors()).thenReturn(new HashMap<>());
when(taskAppDesc.getTableDescriptors()).thenReturn(new HashSet<>());
when(taskAppDesc.getSystemDescriptors()).thenReturn(new HashSet<>());
- when(taskAppDesc.getBroadcastStreams()).thenReturn(new HashSet<>());
+ when(taskAppDesc.getIntermediateBroadcastStreamIds()).thenReturn(new HashSet<>());
doReturn(LegacyTaskApplication.class).when(taskAppDesc).getAppClass();
Map<String, String> systemStreamConfigs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 6bbd674..2da5858 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -86,134 +86,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestOperatorImplGraph {
- private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
- List<OperatorImpl> operators = new ArrayList<>();
- operators.add(op);
- while (!operators.isEmpty()) {
- OperatorImpl opImpl = operators.remove(0);
- s.add(opImpl);
- if (!opImpl.registeredOperators.isEmpty()) {
- operators.addAll(opImpl.registeredOperators);
- }
- }
- }
-
- static class TestMapFunction<M, OM> extends BaseTestFunction implements MapFunction<M, OM> {
- final Function<M, OM> mapFn;
-
- public TestMapFunction(String opId, Function<M, OM> mapFn) {
- super(opId);
- this.mapFn = mapFn;
- }
-
- @Override
- public OM apply(M message) {
- return this.mapFn.apply(message);
- }
- }
-
- static class TestJoinFunction<K, M, JM, RM> extends BaseTestFunction implements JoinFunction<K, M, JM, RM> {
- final BiFunction<M, JM, RM> joiner;
- final Function<M, K> firstKeyFn;
- final Function<JM, K> secondKeyFn;
- final Collection<RM> joinResults = new HashSet<>();
-
- public TestJoinFunction(String opId, BiFunction<M, JM, RM> joiner, Function<M, K> firstKeyFn, Function<JM, K> secondKeyFn) {
- super(opId);
- this.joiner = joiner;
- this.firstKeyFn = firstKeyFn;
- this.secondKeyFn = secondKeyFn;
- }
-
- @Override
- public RM apply(M message, JM otherMessage) {
- RM result = this.joiner.apply(message, otherMessage);
- this.joinResults.add(result);
- return result;
- }
-
- @Override
- public K getFirstKey(M message) {
- return this.firstKeyFn.apply(message);
- }
-
- @Override
- public K getSecondKey(JM message) {
- return this.secondKeyFn.apply(message);
- }
- }
-
- static abstract class BaseTestFunction implements InitableFunction, ClosableFunction, Serializable {
-
- static Map<TaskName, Map<String, BaseTestFunction>> perTaskFunctionMap = new HashMap<>();
- static Map<TaskName, List<String>> perTaskInitList = new HashMap<>();
- static Map<TaskName, List<String>> perTaskCloseList = new HashMap<>();
- int numInitCalled = 0;
- int numCloseCalled = 0;
- TaskName taskName = null;
- final String opId;
-
- public BaseTestFunction(String opId) {
- this.opId = opId;
- }
-
- static public void reset() {
- perTaskFunctionMap.clear();
- perTaskCloseList.clear();
- perTaskInitList.clear();
- }
-
- static public BaseTestFunction getInstanceByTaskName(TaskName taskName, String opId) {
- return perTaskFunctionMap.get(taskName).get(opId);
- }
-
- static public List<String> getInitListByTaskName(TaskName taskName) {
- return perTaskInitList.get(taskName);
- }
-
- static public List<String> getCloseListByTaskName(TaskName taskName) {
- return perTaskCloseList.get(taskName);
- }
-
- @Override
- public void close() {
- if (this.taskName == null) {
- throw new IllegalStateException("Close called before init");
- }
- if (perTaskFunctionMap.get(this.taskName) == null || !perTaskFunctionMap.get(this.taskName).containsKey(opId)) {
- throw new IllegalStateException("Close called before init");
- }
-
- if (perTaskCloseList.get(this.taskName) == null) {
- perTaskCloseList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
- } else {
- perTaskCloseList.get(taskName).add(opId);
- }
-
- this.numCloseCalled++;
- }
-
- @Override
- public void init(Context context) {
- TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
- if (perTaskFunctionMap.get(taskName) == null) {
- perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
- } else {
- if (perTaskFunctionMap.get(taskName).containsKey(opId)) {
- throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
- }
- perTaskFunctionMap.get(taskName).put(opId, this);
- }
- if (perTaskInitList.get(taskName) == null) {
- perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
- } else {
- perTaskInitList.get(taskName).add(opId);
- }
- this.taskName = taskName;
- this.numInitCalled++;
- }
- }
-
private Context context;
@Before
@@ -357,6 +229,8 @@ public class TestOperatorImplGraph {
String inputSystem = "input-system";
String inputPhysicalName = "input-stream";
HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "test-job");
+ configMap.put(JobConfig.JOB_ID(), "1");
StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName);
Config config = new MapConfig(configMap);
when(this.context.getJobContext().getConfig()).thenReturn(config);
@@ -391,7 +265,7 @@ public class TestOperatorImplGraph {
MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
stream1.merge(Collections.singleton(stream2))
.map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m));
- }, mock(Config.class));
+ }, getConfig());
TaskName mockTaskName = mock(TaskName.class);
TaskModel taskModel = mock(TaskModel.class);
@@ -495,7 +369,6 @@ public class TestOperatorImplGraph {
String inputStreamId1 = "input1";
String inputStreamId2 = "input2";
String inputSystem = "input-system";
- Config mockConfig = mock(Config.class);
TaskName mockTaskName = mock(TaskName.class);
TaskModel taskModel = mock(TaskModel.class);
@@ -515,7 +388,7 @@ public class TestOperatorImplGraph {
inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
.map(new TestMapFunction<Object, Object>("4", mapFn));
- }, mockConfig);
+ }, getConfig());
OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, SystemClock.instance());
@@ -693,4 +566,138 @@ public class TestOperatorImplGraph {
assertTrue(counts.get(int1) == 3);
assertTrue(counts.get(int2) == 2);
}
+
+ private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
+ List<OperatorImpl> operators = new ArrayList<>();
+ operators.add(op);
+ while (!operators.isEmpty()) {
+ OperatorImpl opImpl = operators.remove(0);
+ s.add(opImpl);
+ if (!opImpl.registeredOperators.isEmpty()) {
+ operators.addAll(opImpl.registeredOperators);
+ }
+ }
+ }
+
+ private Config getConfig() {
+ HashMap<String, String> configMap = new HashMap<>();
+ configMap.put(JobConfig.JOB_NAME(), "test-job");
+ configMap.put(JobConfig.JOB_ID(), "1");
+ return new MapConfig(configMap);
+ }
+
+ private static class TestMapFunction<M, OM> extends BaseTestFunction implements MapFunction<M, OM> {
+ final Function<M, OM> mapFn;
+
+ public TestMapFunction(String opId, Function<M, OM> mapFn) {
+ super(opId);
+ this.mapFn = mapFn;
+ }
+
+ @Override
+ public OM apply(M message) {
+ return this.mapFn.apply(message);
+ }
+ }
+
+ private static class TestJoinFunction<K, M, JM, RM> extends BaseTestFunction implements JoinFunction<K, M, JM, RM> {
+ final BiFunction<M, JM, RM> joiner;
+ final Function<M, K> firstKeyFn;
+ final Function<JM, K> secondKeyFn;
+ final Collection<RM> joinResults = new HashSet<>();
+
+ public TestJoinFunction(String opId, BiFunction<M, JM, RM> joiner, Function<M, K> firstKeyFn, Function<JM, K> secondKeyFn) {
+ super(opId);
+ this.joiner = joiner;
+ this.firstKeyFn = firstKeyFn;
+ this.secondKeyFn = secondKeyFn;
+ }
+
+ @Override
+ public RM apply(M message, JM otherMessage) {
+ RM result = this.joiner.apply(message, otherMessage);
+ this.joinResults.add(result);
+ return result;
+ }
+
+ @Override
+ public K getFirstKey(M message) {
+ return this.firstKeyFn.apply(message);
+ }
+
+ @Override
+ public K getSecondKey(JM message) {
+ return this.secondKeyFn.apply(message);
+ }
+ }
+
+ private static abstract class BaseTestFunction implements InitableFunction, ClosableFunction, Serializable {
+ static Map<TaskName, Map<String, BaseTestFunction>> perTaskFunctionMap = new HashMap<>();
+ static Map<TaskName, List<String>> perTaskInitList = new HashMap<>();
+ static Map<TaskName, List<String>> perTaskCloseList = new HashMap<>();
+ int numInitCalled = 0;
+ int numCloseCalled = 0;
+ TaskName taskName = null;
+ final String opId;
+
+ public BaseTestFunction(String opId) {
+ this.opId = opId;
+ }
+
+ static public void reset() {
+ perTaskFunctionMap.clear();
+ perTaskCloseList.clear();
+ perTaskInitList.clear();
+ }
+
+ static public BaseTestFunction getInstanceByTaskName(TaskName taskName, String opId) {
+ return perTaskFunctionMap.get(taskName).get(opId);
+ }
+
+ static public List<String> getInitListByTaskName(TaskName taskName) {
+ return perTaskInitList.get(taskName);
+ }
+
+ static public List<String> getCloseListByTaskName(TaskName taskName) {
+ return perTaskCloseList.get(taskName);
+ }
+
+ @Override
+ public void close() {
+ if (this.taskName == null) {
+ throw new IllegalStateException("Close called before init");
+ }
+ if (perTaskFunctionMap.get(this.taskName) == null || !perTaskFunctionMap.get(this.taskName).containsKey(opId)) {
+ throw new IllegalStateException("Close called before init");
+ }
+
+ if (perTaskCloseList.get(this.taskName) == null) {
+ perTaskCloseList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
+ } else {
+ perTaskCloseList.get(taskName).add(opId);
+ }
+
+ this.numCloseCalled++;
+ }
+
+ @Override
+ public void init(Context context) {
+ TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
+ if (perTaskFunctionMap.get(taskName) == null) {
+ perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
+ } else {
+ if (perTaskFunctionMap.get(taskName).containsKey(opId)) {
+ throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
+ }
+ perTaskFunctionMap.get(taskName).put(opId, this);
+ }
+ if (perTaskInitList.get(taskName) == null) {
+ perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
+ } else {
+ perTaskInitList.get(taskName).add(opId);
+ }
+ this.taskName = taskName;
+ this.numInitCalled++;
+ }
+ }
}