You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:48 UTC
[08/12] samza git commit: Consolidating package names for System,
Stream, Application and Table descriptors.
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
index e9e2635..2b29a2b 100644
--- a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
+++ b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.application;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
import org.apache.samza.task.TaskFactoryUtil;
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
deleted file mode 100644
index 3bc3ed5..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.TableImpl;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.hybrid.BaseHybridTableDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class defines:
- * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
- * and {@link Table} to create the DAG of transforms.
- * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
- */
-public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
- implements StreamApplicationDescriptor {
- private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
- private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
- private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
- private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
- private final Set<String> broadcastStreams = new HashSet<>();
- private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
- private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
- // We use a LHM for deterministic order in initializing and closing operators.
- private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
- private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
- private final Map<String, TableImpl> tables = new LinkedHashMap<>();
- private final Set<String> operatorIds = new HashSet<>();
-
- private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
-
- /**
- * The 0-based position of the next operator in the graph.
- * Part of the unique ID for each OperatorSpec in the graph.
- * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
- */
- private int nextOpNum = 0;
-
- public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
- super(userApp, config);
- userApp.describe(this);
- }
-
- @Override
- public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
- Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
- Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
- "Default system must be set before creating any input or output streams.");
- addSystemDescriptor(defaultSystemDescriptor);
-
- defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
- return this;
- }
-
- @Override
- public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
- SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
- Optional<StreamExpander> expander = systemDescriptor.getExpander();
- if (expander.isPresent()) {
- return expander.get().apply(this, inputDescriptor);
- }
-
- // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
- Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
- String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
- inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
- addSystemDescriptor(inputDescriptor.getSystemDescriptor());
-
- String streamId = inputDescriptor.getStreamId();
- Preconditions.checkState(!inputOperators.containsKey(streamId),
- "getInputStream must not be called multiple times with the same streamId: " + streamId);
-
- Serde serde = inputDescriptor.getSerde();
- KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
- if (outputStreams.containsKey(streamId)) {
- OutputStreamImpl outputStream = outputStreams.get(streamId);
- Serde keySerde = outputStream.getKeySerde();
- Serde valueSerde = outputStream.getValueSerde();
- Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
- String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
- + "stream level, so the same key and message Serde must be used for both.", streamId));
- }
-
- boolean isKeyed = serde instanceof KVSerde;
- InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
- InputOperatorSpec inputOperatorSpec =
- OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
- transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
- inputOperators.put(streamId, inputOperatorSpec);
- return new MessageStreamImpl(this, inputOperators.get(streamId));
- }
-
- @Override
- public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
- Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
- String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
- outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
- addSystemDescriptor(outputDescriptor.getSystemDescriptor());
-
- String streamId = outputDescriptor.getStreamId();
- Preconditions.checkState(!outputStreams.containsKey(streamId),
- "getOutputStream must not be called multiple times with the same streamId: " + streamId);
-
- Serde serde = outputDescriptor.getSerde();
- KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
- if (inputOperators.containsKey(streamId)) {
- InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
- Serde keySerde = inputOperatorSpec.getKeySerde();
- Serde valueSerde = inputOperatorSpec.getValueSerde();
- Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
- String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
- + "stream level, so the same key and message Serde must be used for both.", streamId));
- }
-
- boolean isKeyed = serde instanceof KVSerde;
- outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
- return outputStreams.get(streamId);
- }
-
- @Override
- public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
-
- if (tableDescriptor instanceof BaseHybridTableDescriptor) {
- List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
- tableDescs.forEach(td -> getTable(td));
- }
-
- String tableId = tableDescriptor.getTableId();
- Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
- String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
- Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
- String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
- tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
-
- BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
- TableSpec tableSpec = baseTableDescriptor.getTableSpec();
- if (tables.containsKey(tableSpec.getId())) {
- throw new IllegalStateException(
- String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
- }
- tables.put(tableSpec.getId(), new TableImpl(tableSpec));
- getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
- return tables.get(tableSpec.getId());
- }
-
- /**
- * Get all the {@link InputDescriptor}s to this application
- *
- * @return an immutable map of streamId to {@link InputDescriptor}
- */
- @Override
- public Map<String, InputDescriptor> getInputDescriptors() {
- return Collections.unmodifiableMap(inputDescriptors);
- }
-
- /**
- * Get all the {@link OutputDescriptor}s from this application
- *
- * @return an immutable map of streamId to {@link OutputDescriptor}
- */
- @Override
- public Map<String, OutputDescriptor> getOutputDescriptors() {
- return Collections.unmodifiableMap(outputDescriptors);
- }
-
- /**
- * Get all the broadcast streamIds from this application
- *
- * @return an immutable set of streamIds
- */
- @Override
- public Set<String> getBroadcastStreams() {
- return Collections.unmodifiableSet(broadcastStreams);
- }
-
- /**
- * Get all the {@link TableDescriptor}s in this application
- *
- * @return an immutable set of {@link TableDescriptor}s
- */
- @Override
- public Set<TableDescriptor> getTableDescriptors() {
- return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
- }
-
- /**
- * Get all the unique {@link SystemDescriptor}s in this application
- *
- * @return an immutable set of {@link SystemDescriptor}s
- */
- @Override
- public Set<SystemDescriptor> getSystemDescriptors() {
- // We enforce that users must not use different system descriptor instances for the same system name
- // when getting an input/output stream or setting the default system descriptor
- return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
- }
-
- @Override
- public Set<String> getInputStreamIds() {
- return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
- }
-
- @Override
- public Set<String> getOutputStreamIds() {
- return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
- }
-
- /**
- * Get the default {@link SystemDescriptor} in this application
- *
- * @return the default {@link SystemDescriptor}
- */
- @Override
- public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
- return defaultSystemDescriptorOptional;
- }
-
- public OperatorSpecGraph getOperatorSpecGraph() {
- return new OperatorSpecGraph(this);
- }
-
- /**
- * Gets the unique ID for the next operator in the graph. The ID is of the following format:
- * jobName-jobId-opCode-(userDefinedId|nextOpNum);
- *
- * @param opCode the {@link OpCode} of the next operator
- * @param userDefinedId the optional user-provided name of the next operator or null
- * @return the unique ID for the next operator in the graph
- */
- public String getNextOpId(OpCode opCode, String userDefinedId) {
- if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
- throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
- }
-
- String nextOpId = String.format("%s-%s-%s-%s",
- config.get(JobConfig.JOB_NAME()),
- config.get(JobConfig.JOB_ID(), "1"),
- opCode.name().toLowerCase(),
- StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
- if (!operatorIds.add(nextOpId)) {
- throw new SamzaException(
- String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
- }
- nextOpNum++;
- return nextOpId;
- }
-
- /**
- * Gets the unique ID for the next operator in the graph. The ID is of the following format:
- * jobName-jobId-opCode-nextOpNum;
- *
- * @param opCode the {@link OpCode} of the next operator
- * @return the unique ID for the next operator in the graph
- */
- public String getNextOpId(OpCode opCode) {
- return getNextOpId(opCode, null);
- }
-
- public Map<String, InputOperatorSpec> getInputOperators() {
- return Collections.unmodifiableMap(inputOperators);
- }
-
- public Map<String, OutputStreamImpl> getOutputStreams() {
- return Collections.unmodifiableMap(outputStreams);
- }
-
- public Map<String, TableImpl> getTables() {
- return Collections.unmodifiableMap(tables);
- }
-
- /**
- * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
- * An intermediate {@link MessageStream} is both an output and an input stream.
- *
- * @param streamId the id of the stream to be created.
- * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
- * is used.
- * @param isBroadcast whether the stream is a broadcast stream.
- * @param <M> the type of messages in the intermediate {@link MessageStream}
- * @return the intermediate {@link MessageStreamImpl}
- */
- @VisibleForTesting
- public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
- Preconditions.checkNotNull(serde, "serde must not be null for intermediate stream: " + streamId);
- Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
- "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
-
- if (isBroadcast) {
- broadcastStreams.add(streamId);
- }
-
- boolean isKeyed = serde instanceof KVSerde;
- KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
-
- InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
- .flatMap(SystemDescriptor::getTransformer).orElse(null);
-
- InputOperatorSpec inputOperatorSpec =
- OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
- transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
- inputOperators.put(streamId, inputOperatorSpec);
- outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
- return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
- }
-
- // check uniqueness of the {@code systemDescriptor} and add if it is unique
- private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
- Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
- || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
- "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
- systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
deleted file mode 100644
index b4fde1e..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.task.TaskFactory;
-
-
-/**
- * This class implements interface {@link TaskApplicationDescriptor}.
- * <p>
- * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes
- * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams,
- * and the list of {@link TableDescriptor}s used in the application.
- */
-public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
- implements TaskApplicationDescriptor {
-
- private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
- private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
- private final Set<String> broadcastStreams = new HashSet<>();
- private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
- private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-
- private TaskFactory taskFactory = null;
-
- public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
- super(userApp, config);
- userApp.describe(this);
- }
-
- @Override
- public void setTaskFactory(TaskFactory factory) {
- this.taskFactory = factory;
- }
-
- @Override
- public void addInputStream(InputDescriptor inputDescriptor) {
- // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
- Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
- String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
- getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
- inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
- addSystemDescriptor(inputDescriptor.getSystemDescriptor());
- }
-
- @Override
- public void addOutputStream(OutputDescriptor outputDescriptor) {
- Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
- String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
- getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
- outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
- addSystemDescriptor(outputDescriptor.getSystemDescriptor());
- }
-
- @Override
- public void addTable(TableDescriptor tableDescriptor) {
- Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
- String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
- getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
- tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
- }
-
- @Override
- public Map<String, InputDescriptor> getInputDescriptors() {
- return Collections.unmodifiableMap(inputDescriptors);
- }
-
- @Override
- public Map<String, OutputDescriptor> getOutputDescriptors() {
- return Collections.unmodifiableMap(outputDescriptors);
- }
-
- @Override
- public Set<String> getBroadcastStreams() {
- return Collections.unmodifiableSet(broadcastStreams);
- }
-
- @Override
- public Set<TableDescriptor> getTableDescriptors() {
- return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
- }
-
- @Override
- public Set<SystemDescriptor> getSystemDescriptors() {
- // We enforce that users must not use different system descriptor instances for the same system name
- // when getting an input/output stream or setting the default system descriptor
- return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
- }
-
- @Override
- public Set<String> getInputStreamIds() {
- return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
- }
-
- @Override
- public Set<String> getOutputStreamIds() {
- return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
- }
-
- /**
- * Get the user-defined {@link TaskFactory}
- * @return the {@link TaskFactory} object
- */
- public TaskFactory getTaskFactory() {
- return taskFactory;
- }
-
- // check uniqueness of the {@code systemDescriptor} and add if it is unique
- private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
- Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
- || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
- "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
- systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
new file mode 100644
index 0000000..f3c34a9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ApplicationTaskContext;
+import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.metrics.MetricsReporterFactory;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is the base class that implements interface {@link ApplicationDescriptor}.
+ * <p>
+ * This base class contains the common objects that are used by both high-level and low-level API applications, such as
+ * {@link Config}, {@link ApplicationContainerContextFactory}, {@link ApplicationTaskContextFactory}, and
+ * {@link ProcessorLifecycleListenerFactory}.
+ *
+ * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either
+ * {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor}
+ */
+public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
+ implements ApplicationDescriptor<S> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
+
+ private final Class<? extends SamzaApplication> appClass;
+ private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
+ // serdes used by input/output/intermediate streams, keyed by streamId
+ private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
+ // serdes used by tables, keyed by tableId
+ private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
+ final Config config;
+
+ private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty();
+ private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty();
+
+ // Default to no-op ProcessorLifecycleListenerFactory
+ ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new ProcessorLifecycleListener() { };
+
+ ApplicationDescriptorImpl(SamzaApplication app, Config config) {
+ this.config = config;
+ this.appClass = app.getClass();
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
+ this.applicationContainerContextFactoryOptional = Optional.of(factory);
+ return (S) this;
+ }
+
+ @Override
+ public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
+ this.applicationTaskContextFactoryOptional = Optional.of(factory);
+ return (S) this;
+ }
+
+ @Override
+ public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
+ this.listenerFactory = listenerFactory;
+ return (S) this;
+ }
+
+ @Override
+ public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
+ this.reporterFactories.clear();
+ this.reporterFactories.putAll(reporterFactories);
+ return (S) this;
+ }
+
+ /**
+ * Get the application class
+ *
+ * @return an implementation of {@link SamzaApplication}
+ */
+ public Class<? extends SamzaApplication> getAppClass() {
+ return appClass;
+ }
+
+ /**
+ * Get the {@link ApplicationContainerContextFactory} specified by the application.
+ *
+ * @return {@link ApplicationContainerContextFactory} if application specified it; empty otherwise
+ */
+ public Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> getApplicationContainerContextFactory() {
+ @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationContainerContext
+ Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> factoryOptional =
+ (Optional) this.applicationContainerContextFactoryOptional;
+ return factoryOptional;
+ }
+
+ /**
+ * Get the {@link ApplicationTaskContextFactory} specified by the application.
+ *
+ * @return {@link ApplicationTaskContextFactory} if application specified it; empty otherwise
+ */
+ public Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> getApplicationTaskContextFactory() {
+ @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationTaskContext
+ Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> factoryOptional =
+ (Optional) this.applicationTaskContextFactoryOptional;
+ return factoryOptional;
+ }
+
+ /**
+ * Get the {@link ProcessorLifecycleListenerFactory} associated with this application
+ *
+ * @return the {@link ProcessorLifecycleListenerFactory} in this application
+ */
+ public ProcessorLifecycleListenerFactory getProcessorLifecycleListenerFactory() {
+ return listenerFactory;
+ }
+
+ /**
+ * Get the {@link MetricsReporterFactory}s used in the application
+ *
+ * @return the map of {@link MetricsReporterFactory}s
+ */
+ public Map<String, MetricsReporterFactory> getMetricsReporterFactories() {
+ return Collections.unmodifiableMap(reporterFactories);
+ }
+
+ /**
+ * Get the default {@link SystemDescriptor} in this application
+ *
+ * @return the default {@link SystemDescriptor}
+ */
+ public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+ // default is not set
+ return Optional.empty();
+ }
+
+ /**
+ * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+ *
+ * @param streamId id of the stream
+ * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+ */
+ public KV<Serde, Serde> getStreamSerdes(String streamId) {
+ return streamSerdes.get(streamId);
+ }
+
+ /**
+ * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+ *
+ * @param tableId id of the table
+ * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+ */
+ public KV<Serde, Serde> getTableSerdes(String tableId) {
+ return tableSerdes.get(tableId);
+ }
+
+ /**
+ * Get the map of all {@link InputOperatorSpec}s in this applicaiton
+ *
+ * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level
+ * {@link org.apache.samza.application.TaskApplication}
+ */
+ public Map<String, InputOperatorSpec> getInputOperators() {
+ return Collections.EMPTY_MAP;
+ }
+
+ /**
+ * Get all the {@link InputDescriptor}s to this application
+ *
+ * @return an immutable map of streamId to {@link InputDescriptor}
+ */
+ public abstract Map<String, InputDescriptor> getInputDescriptors();
+
+ /**
+ * Get all the {@link OutputDescriptor}s from this application
+ *
+ * @return an immutable map of streamId to {@link OutputDescriptor}
+ */
+ public abstract Map<String, OutputDescriptor> getOutputDescriptors();
+
+ /**
+ * Get all the broadcast streamIds from this application
+ *
+ * @return an immutable set of streamIds
+ */
+ public abstract Set<String> getBroadcastStreams();
+
+ /**
+ * Get all the {@link TableDescriptor}s in this application
+ *
+ * @return an immutable set of {@link TableDescriptor}s
+ */
+ public abstract Set<TableDescriptor> getTableDescriptors();
+
+ /**
+ * Get all the unique {@link SystemDescriptor}s in this application
+ *
+ * @return an immutable set of {@link SystemDescriptor}s
+ */
+ public abstract Set<SystemDescriptor> getSystemDescriptors();
+
+ /**
+ * Get all the unique input streamIds in this application
+ *
+ * @return an immutable set of input streamIds
+ */
+ public abstract Set<String> getInputStreamIds();
+
+ /**
+ * Get all the unique output streamIds in this application
+ *
+ * @return an immutable set of output streamIds
+ */
+ public abstract Set<String> getOutputStreamIds();
+
+ KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
+ Serde keySerde, valueSerde;
+
+ KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);
+
+ if (serde instanceof KVSerde) {
+ keySerde = ((KVSerde) serde).getKeySerde();
+ valueSerde = ((KVSerde) serde).getValueSerde();
+ } else {
+ keySerde = new NoOpSerde();
+ valueSerde = serde;
+ }
+
+ if (currentSerdePair == null) {
+ if (keySerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+ ". Keys will not be (de)serialized");
+ }
+ if (valueSerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+ ". Values will not be (de)serialized");
+ }
+ streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
+ } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+ throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+ + "different serdes.", streamId));
+ }
+ return streamSerdes.get(streamId);
+ }
+
+ KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
+ Serde keySerde, valueSerde;
+ keySerde = kvSerde.getKeySerde();
+ valueSerde = kvSerde.getValueSerde();
+
+ if (!tableSerdes.containsKey(tableId)) {
+ tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
+ return tableSerdes.get(tableId);
+ }
+
+ KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
+ if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
+ throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
+ + "different serdes.", tableId));
+ }
+ return streamSerdes.get(tableId);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java
new file mode 100644
index 0000000..e3c2d5c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config}
+ */
+public class ApplicationDescriptorUtil {
+
+ private ApplicationDescriptorUtil() {
+
+ }
+
+ /**
+ * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config}
+ *
+ * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name.
+ * @param config the {@link Config} for the application
+ * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config
+ */
+ public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) {
+ if (app instanceof StreamApplication) {
+ return new StreamApplicationDescriptorImpl((StreamApplication) app, config);
+ }
+ if (app instanceof TaskApplication) {
+ return new TaskApplicationDescriptorImpl((TaskApplication) app, config);
+ }
+ throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication "
+ + "and TaskApplication are supported.", app.getClass().getName()));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
new file mode 100644
index 0000000..ec36cf3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.system.descriptors.InputTransformer;
+import org.apache.samza.system.descriptors.StreamExpander;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class defines:
+ * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
+ * and {@link Table} to create the DAG of transforms.
+ * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
+ */
+public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
+ implements StreamApplicationDescriptor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
+ private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+ private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+ private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+ private final Set<String> broadcastStreams = new HashSet<>();
+ private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+ private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+ // We use a LHM for deterministic order in initializing and closing operators.
+ private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+ private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+ private final Map<String, TableImpl> tables = new LinkedHashMap<>();
+ private final Set<String> operatorIds = new HashSet<>();
+
+ private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
+
+ /**
+ * The 0-based position of the next operator in the graph.
+ * Part of the unique ID for each OperatorSpec in the graph.
+ * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
+ */
+ private int nextOpNum = 0;
+
+ public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
+ super(userApp, config);
+ userApp.describe(this);
+ }
+
+ @Override
+ public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+ Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
+ Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
+ "Default system must be set before creating any input or output streams.");
+ addSystemDescriptor(defaultSystemDescriptor);
+
+ defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
+ return this;
+ }
+
+ @Override
+ public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
+ SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
+ Optional<StreamExpander> expander = systemDescriptor.getExpander();
+ if (expander.isPresent()) {
+ return expander.get().apply(this, inputDescriptor);
+ }
+
+ // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+ Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+ String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+ inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+ addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+
+ String streamId = inputDescriptor.getStreamId();
+ Preconditions.checkState(!inputOperators.containsKey(streamId),
+ "getInputStream must not be called multiple times with the same streamId: " + streamId);
+
+ Serde serde = inputDescriptor.getSerde();
+ KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
+ if (outputStreams.containsKey(streamId)) {
+ OutputStreamImpl outputStream = outputStreams.get(streamId);
+ Serde keySerde = outputStream.getKeySerde();
+ Serde valueSerde = outputStream.getValueSerde();
+ Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+ String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ + "stream level, so the same key and message Serde must be used for both.", streamId));
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
+ InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
+ InputOperatorSpec inputOperatorSpec =
+ OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+ transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+ inputOperators.put(streamId, inputOperatorSpec);
+ return new MessageStreamImpl(this, inputOperators.get(streamId));
+ }
+
+ @Override
+ public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
+ Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+ String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+ outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+ addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+
+ String streamId = outputDescriptor.getStreamId();
+ Preconditions.checkState(!outputStreams.containsKey(streamId),
+ "getOutputStream must not be called multiple times with the same streamId: " + streamId);
+
+ Serde serde = outputDescriptor.getSerde();
+ KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
+ if (inputOperators.containsKey(streamId)) {
+ InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
+ Serde keySerde = inputOperatorSpec.getKeySerde();
+ Serde valueSerde = inputOperatorSpec.getValueSerde();
+ Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+ String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ + "stream level, so the same key and message Serde must be used for both.", streamId));
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
+ outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+ return outputStreams.get(streamId);
+ }
+
+ @Override
+ public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
+
+ if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+ List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+ tableDescs.forEach(td -> getTable(td));
+ }
+
+ String tableId = tableDescriptor.getTableId();
+ Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
+ String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
+ Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+ String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+ tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+
+ BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
+ TableSpec tableSpec = baseTableDescriptor.getTableSpec();
+ if (tables.containsKey(tableSpec.getId())) {
+ throw new IllegalStateException(
+ String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
+ }
+ tables.put(tableSpec.getId(), new TableImpl(tableSpec));
+ getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
+ return tables.get(tableSpec.getId());
+ }
+
+ /**
+ * Get all the {@link InputDescriptor}s to this application
+ *
+ * @return an immutable map of streamId to {@link InputDescriptor}
+ */
+ @Override
+ public Map<String, InputDescriptor> getInputDescriptors() {
+ return Collections.unmodifiableMap(inputDescriptors);
+ }
+
+ /**
+ * Get all the {@link OutputDescriptor}s from this application
+ *
+ * @return an immutable map of streamId to {@link OutputDescriptor}
+ */
+ @Override
+ public Map<String, OutputDescriptor> getOutputDescriptors() {
+ return Collections.unmodifiableMap(outputDescriptors);
+ }
+
+ /**
+ * Get all the broadcast streamIds from this application
+ *
+ * @return an immutable set of streamIds
+ */
+ @Override
+ public Set<String> getBroadcastStreams() {
+ return Collections.unmodifiableSet(broadcastStreams);
+ }
+
+ /**
+ * Get all the {@link TableDescriptor}s in this application
+ *
+ * @return an immutable set of {@link TableDescriptor}s
+ */
+ @Override
+ public Set<TableDescriptor> getTableDescriptors() {
+ return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+ }
+
+ /**
+ * Get all the unique {@link SystemDescriptor}s in this application
+ *
+ * @return an immutable set of {@link SystemDescriptor}s
+ */
+ @Override
+ public Set<SystemDescriptor> getSystemDescriptors() {
+ // We enforce that users must not use different system descriptor instances for the same system name
+ // when getting an input/output stream or setting the default system descriptor
+ return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+ }
+
+ @Override
+ public Set<String> getInputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet()));
+ }
+
+ @Override
+ public Set<String> getOutputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
+ }
+
+ /**
+ * Get the default {@link SystemDescriptor} in this application
+ *
+ * @return the default {@link SystemDescriptor}
+ */
+ @Override
+ public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+ return defaultSystemDescriptorOptional;
+ }
+
+ public OperatorSpecGraph getOperatorSpecGraph() {
+ return new OperatorSpecGraph(this);
+ }
+
+ /**
+ * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+ * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+ *
+ * @param opCode the {@link OpCode} of the next operator
+ * @param userDefinedId the optional user-provided name of the next operator or null
+ * @return the unique ID for the next operator in the graph
+ */
+ public String getNextOpId(OpCode opCode, String userDefinedId) {
+ if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
+ throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
+ }
+
+ String nextOpId = String.format("%s-%s-%s-%s",
+ config.get(JobConfig.JOB_NAME()),
+ config.get(JobConfig.JOB_ID(), "1"),
+ opCode.name().toLowerCase(),
+ StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
+ if (!operatorIds.add(nextOpId)) {
+ throw new SamzaException(
+ String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
+ }
+ nextOpNum++;
+ return nextOpId;
+ }
+
+ /**
+ * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+ * jobName-jobId-opCode-nextOpNum;
+ *
+ * @param opCode the {@link OpCode} of the next operator
+ * @return the unique ID for the next operator in the graph
+ */
+ public String getNextOpId(OpCode opCode) {
+ return getNextOpId(opCode, null);
+ }
+
+ public Map<String, InputOperatorSpec> getInputOperators() {
+ return Collections.unmodifiableMap(inputOperators);
+ }
+
+ public Map<String, OutputStreamImpl> getOutputStreams() {
+ return Collections.unmodifiableMap(outputStreams);
+ }
+
+ public Map<String, TableImpl> getTables() {
+ return Collections.unmodifiableMap(tables);
+ }
+
+ /**
+ * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
+ * An intermediate {@link MessageStream} is both an output and an input stream.
+ *
+ * @param streamId the id of the stream to be created.
+ * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
+ * is used.
+ * @param isBroadcast whether the stream is a broadcast stream.
+ * @param <M> the type of messages in the intermediate {@link MessageStream}
+ * @return the intermediate {@link MessageStreamImpl}
+ */
+ @VisibleForTesting
+ public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
+ Preconditions.checkNotNull(serde, "serde must not be null for intermediate stream: " + streamId);
+ Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
+ "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
+
+ if (isBroadcast) {
+ broadcastStreams.add(streamId);
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
+ KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
+
+ InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
+ .flatMap(SystemDescriptor::getTransformer).orElse(null);
+
+ InputOperatorSpec inputOperatorSpec =
+ OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+ transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+ inputOperators.put(streamId, inputOperatorSpec);
+ outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+ return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
+ }
+
+ // check uniqueness of the {@code systemDescriptor} and add if it is unique
+ private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+ Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+ || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+ "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+ systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
new file mode 100644
index 0000000..cb924ab
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application.descriptors;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ * This class implements interface {@link TaskApplicationDescriptor}.
+ * <p>
+ * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes
+ * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams,
+ * and the list of {@link TableDescriptor}s used in the application.
+ */
+public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
+ implements TaskApplicationDescriptor {
+
+ private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+ private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+ private final Set<String> broadcastStreams = new HashSet<>();
+ private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+ private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+
+ private TaskFactory taskFactory = null;
+
+ public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
+ super(userApp, config);
+ userApp.describe(this);
+ }
+
+ @Override
+ public void setTaskFactory(TaskFactory factory) {
+ this.taskFactory = factory;
+ }
+
+ @Override
+ public void addInputStream(InputDescriptor inputDescriptor) {
+ // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+ Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+ String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+ getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
+ inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+ addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+ }
+
+ @Override
+ public void addOutputStream(OutputDescriptor outputDescriptor) {
+ Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+ String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+ getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
+ outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+ addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+ }
+
+ @Override
+ public void addTable(TableDescriptor tableDescriptor) {
+ Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+ String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+ getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
+ tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+ }
+
+ @Override
+ public Map<String, InputDescriptor> getInputDescriptors() {
+ return Collections.unmodifiableMap(inputDescriptors);
+ }
+
+ @Override
+ public Map<String, OutputDescriptor> getOutputDescriptors() {
+ return Collections.unmodifiableMap(outputDescriptors);
+ }
+
+ @Override
+ public Set<String> getBroadcastStreams() {
+ return Collections.unmodifiableSet(broadcastStreams);
+ }
+
+ @Override
+ public Set<TableDescriptor> getTableDescriptors() {
+ return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+ }
+
+ @Override
+ public Set<SystemDescriptor> getSystemDescriptors() {
+ // We enforce that users must not use different system descriptor instances for the same system name
+ // when getting an input/output stream or setting the default system descriptor
+ return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+ }
+
+ @Override
+ public Set<String> getInputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
+ }
+
+ @Override
+ public Set<String> getOutputStreamIds() {
+ return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
+ }
+
+ /**
+ * Get the user-defined {@link TaskFactory}
+ * @return the {@link TaskFactory} object
+ */
+ public TaskFactory getTaskFactory() {
+ return taskFactory;
+ }
+
+ // check uniqueness of the {@code systemDescriptor} and add if it is unique
+ private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+ Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+ || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+ "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+ systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
index ed013c4..06f5606 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
@@ -59,9 +59,9 @@ public class JavaTableConfig extends MapConfig {
}
/**
- * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
+ * Get the {@link org.apache.samza.table.descriptors.TableProviderFactory} class for a table
* @param tableId Id of the table
- * @return the {@link org.apache.samza.table.TableProviderFactory} class name
+ * @return the {@link org.apache.samza.table.descriptors.TableProviderFactory} class name
*/
public String getTableProviderFactory(String tableId) {
return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index b80f7df..10a4215 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -35,15 +35,15 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.ListUtils;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index f43b24e..6d9faf3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -30,8 +30,8 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 18705e4..4a2a235 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -200,8 +200,8 @@ import org.codehaus.jackson.map.ObjectMapper;
}
/**
- * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.application.ApplicationDescriptorImpl}
- * for this job
+ * Create JSON POJO for a {@link JobNode}, including the
+ * {@link org.apache.samza.application.descriptors.ApplicationDescriptorImpl} for this job
*
* @param jobNode job node in the {@link JobGraph}
* @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index af556f5..82b4178 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -26,8 +26,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index 83f3f61..dc0fc59 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -26,8 +26,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 86aca0f..6ca5f3d 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -23,8 +23,8 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index 54f86d5..13b29df 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -21,8 +21,8 @@ package org.apache.samza.execution;
import java.util.List;
import java.util.UUID;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.ApplicationDescriptor;
-import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
deleted file mode 100644
index dd47af2..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.TableSpec;
-
-
-/**
- * Base class for all table descriptor implementations.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- * @param <D> the type of the concrete table descriptor
- */
-abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
- implements TableDescriptor<K, V, D> {
-
- protected final String tableId;
-
- protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
-
- protected final Map<String, String> config = new HashMap<>();
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- */
- protected BaseTableDescriptor(String tableId) {
- this.tableId = tableId;
- }
-
- /**
- * Constructs a table descriptor instance
- * @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
- * @param serde the serde for key and value
- */
- protected BaseTableDescriptor(String tableId, KVSerde<K, V> serde) {
- this.tableId = tableId;
- this.serde = serde;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public D withConfig(String key, String value) {
- config.put(key, value);
- return (D) this;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public String getTableId() {
- return tableId;
- }
-
- /**
- * Get the serde assigned to this {@link TableDescriptor}
- *
- * @return {@link KVSerde} used by this table
- */
- public KVSerde<K, V> getSerde() {
- return serde;
- }
-
- /**
- * Generate config for {@link TableSpec}; this method is used internally.
- * @param tableSpecConfig configuration for the {@link TableSpec}
- */
- protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
- tableSpecConfig.putAll(config);
- }
-
- /**
- * Validate that this table descriptor is constructed properly; this method is used internally.
- */
- protected void validate() {
- }
-
- /**
- * Create a {@link TableSpec} from this table descriptor; this method is used internally.
- *
- * @return the {@link TableSpec}
- */
- abstract public TableSpec getTableSpec();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 09e4868..0f43c5e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -24,7 +24,7 @@ import java.time.Duration;
import java.util.Collection;
import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index 5329fd7..a83739d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -25,7 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
deleted file mode 100644
index 6c4ae49..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.descriptors;
-
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for samza framework internal usage.
- * <p>
- * Allows creating a {@link SystemDescriptor} without setting the factory class name, and delegating
- * rest of the system customization to configurations.
- * <p>
- * Useful for code-generation and testing use cases where the factory name is not known in advance.
- */
-@SuppressWarnings("unchecked")
-public final class DelegatingSystemDescriptor extends SystemDescriptor<DelegatingSystemDescriptor>
- implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
-
- /**
- * Constructs an {@link DelegatingSystemDescriptor} instance with no system level serde.
- * Serdes must be provided explicitly at stream level when getting input or output descriptors.
- * SystemFactory class name must be provided in configuration.
- *
- * @param systemName name of this system
- */
- @VisibleForTesting
- public DelegatingSystemDescriptor(String systemName) {
- super(systemName, null, null, null);
- }
-
- @Override
- public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor(
- String streamId, Serde<StreamMessageType> serde) {
- return new GenericInputDescriptor<>(streamId, this, serde);
- }
-
- @Override
- public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor(
- String streamId, Serde<StreamMessageType> serde) {
- return new GenericOutputDescriptor<>(streamId, this, serde);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index 2a73064..fbeda3e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -20,7 +20,7 @@ package org.apache.samza.operators.impl;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.system.IncomingMessageEnvelope;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 1af4806..b467d60 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -18,7 +18,7 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.serializers.Serde;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 0442f7c..1886d1b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -103,7 +103,8 @@ public abstract class OperatorSpec<M, OM> implements Serializable {
}
/**
- * Get the unique ID of this operator in the {@link org.apache.samza.application.StreamApplicationDescriptorImpl}.
+ * Get the unique ID of this operator in the
+ * {@link org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl}.
* @return the unique operator ID
*/
public final String getOpId() {
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 6ebbdae..8d3ff60 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -22,7 +22,7 @@ package org.apache.samza.operators.spec;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.system.descriptors.InputTransformer;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index 4db8e60..44f62d9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -18,7 +18,7 @@
*/
package org.apache.samza.operators.stream;
-import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.spec.InputOperatorSpec;