You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:14 UTC
[8/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and
ApplicationRunner for high- and low-level APIs in YARN and standalone
environment
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
new file mode 100644
index 0000000..acf4bf7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config}
+ */
+public class ApplicationDescriptorUtil {
+
+ private ApplicationDescriptorUtil() {
+
+ }
+
+ /**
+ * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config}
+ *
+ * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name.
+ * @param config the {@link Config} for the application
+ * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config
+ */
+ public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) {
+ if (app instanceof StreamApplication) {
+ return new StreamApplicationDescriptorImpl((StreamApplication) app, config);
+ }
+ if (app instanceof TaskApplication) {
+ return new TaskApplicationDescriptorImpl((TaskApplication) app, config);
+ }
+ throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication "
+ + "and TaskApplication are supported.", app.getClass().getName()));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
new file mode 100644
index 0000000..b39ad3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.TaskConfig;
+import scala.Option;
+
+
+/**
+ * Util class to create {@link SamzaApplication} from the configuration.
+ */
+public class ApplicationUtil {
+
+ /**
+ * Creates the {@link SamzaApplication} object from the task or application class name specified in {@code config}
+ *
+ * @param config the configuration of the application
+ * @return the {@link SamzaApplication} object
+ */
+ public static SamzaApplication fromConfig(Config config) {
+ String appClassName = new ApplicationConfig(config).getAppClass();
+ if (StringUtils.isNotBlank(appClassName)) {
+ // app.class is configured
+ try {
+ Class<SamzaApplication> appClass = (Class<SamzaApplication>) Class.forName(appClassName);
+ if (StreamApplication.class.isAssignableFrom(appClass) || TaskApplication.class.isAssignableFrom(appClass)) {
+ return appClass.newInstance();
+ }
+ } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
+ throw new ConfigException(String.format("Loading app.class %s failed. The user application has to implement "
+ + "StreamApplication or TaskApplication.", appClassName), e);
+ }
+ }
+ // no app.class defined. It has to be a legacy application with task.class configuration
+ Option<String> taskClassOption = new TaskConfig(config).getTaskClass();
+ if (!taskClassOption.isDefined() || !StringUtils.isNotBlank(taskClassOption.getOrElse(null))) {
+ // no task.class defined either. This is wrong.
+ throw new ConfigException("Legacy task applications must set a non-empty task.class in configuration.");
+ }
+ return new LegacyTaskApplication(taskClassOption.get());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
new file mode 100644
index 0000000..e9e2635
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/LegacyTaskApplication.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import org.apache.samza.task.TaskFactoryUtil;
+
+/**
+ * Default {@link TaskApplication} for legacy applications w/ only task.class implemented
+ */
+public final class LegacyTaskApplication implements TaskApplication {
+ private final String taskClassName;
+
+ public LegacyTaskApplication(String taskClassName) {
+ this.taskClassName = taskClassName;
+ }
+
+ @Override
+ public void describe(TaskApplicationDescriptor appDesc) {
+ appDesc.setTaskFactory(TaskFactoryUtil.getTaskFactory(taskClassName));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
new file mode 100644
index 0000000..ae7a45d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class defines:
+ * 1) an implementation of {@link StreamApplicationDescriptor} that provides APIs to access {@link MessageStream}, {@link OutputStream},
+ * and {@link Table} to create the DAG of transforms.
+ * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
+ */
+public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
+ implements StreamApplicationDescriptor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
+ private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
+
+ private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+ private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+ private final Set<String> broadcastStreams = new HashSet<>();
+ private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+ private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+ // We use a LHM for deterministic order in initializing and closing operators.
+ private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+ private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+ private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
+ private final Set<String> operatorIds = new HashSet<>();
+
+ private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
+
+ /**
+ * The 0-based position of the next operator in the graph.
+ * Part of the unique ID for each OperatorSpec in the graph.
+ * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
+ */
+ private int nextOpNum = 0;
+
+ public StreamApplicationDescriptorImpl(StreamApplication userApp, Config config) {
+ super(userApp, config);
+ userApp.describe(this);
+ }
+
+ @Override
+ public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+ Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
+ Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
+ "Default system must be set before creating any input or output streams.");
+ addSystemDescriptor(defaultSystemDescriptor);
+
+ defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
+ return this;
+ }
+
+ @Override
+ public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
+ SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
+ Optional<StreamExpander> expander = systemDescriptor.getExpander();
+ if (expander.isPresent()) {
+ return expander.get().apply(this, inputDescriptor);
+ }
+
+ // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+ Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+ String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+ inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+ addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+
+ String streamId = inputDescriptor.getStreamId();
+ Preconditions.checkState(!inputOperators.containsKey(streamId),
+ "getInputStream must not be called multiple times with the same streamId: " + streamId);
+
+ Serde serde = inputDescriptor.getSerde();
+ KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ if (outputStreams.containsKey(streamId)) {
+ OutputStreamImpl outputStream = outputStreams.get(streamId);
+ Serde keySerde = outputStream.getKeySerde();
+ Serde valueSerde = outputStream.getValueSerde();
+ Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+ String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ + "stream level, so the same key and message Serde must be used for both.", streamId));
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
+ InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
+ InputOperatorSpec inputOperatorSpec =
+ OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+ transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+ inputOperators.put(streamId, inputOperatorSpec);
+ return new MessageStreamImpl(this, inputOperators.get(streamId));
+ }
+
+ @Override
+ public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
+ Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+ String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+ outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+ addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+
+ String streamId = outputDescriptor.getStreamId();
+ Preconditions.checkState(!outputStreams.containsKey(streamId),
+ "getOutputStream must not be called multiple times with the same streamId: " + streamId);
+
+ Serde serde = outputDescriptor.getSerde();
+ KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ if (inputOperators.containsKey(streamId)) {
+ InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
+ Serde keySerde = inputOperatorSpec.getKeySerde();
+ Serde valueSerde = inputOperatorSpec.getValueSerde();
+ Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
+ String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ + "stream level, so the same key and message Serde must be used for both.", streamId));
+ }
+
+ boolean isKeyed = serde instanceof KVSerde;
+ outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+ return outputStreams.get(streamId);
+ }
+
+ @Override
+ public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
+ String tableId = tableDescriptor.getTableId();
+ Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
+ String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
+ Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+ String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+ tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+
+ TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
+ if (tables.containsKey(tableSpec)) {
+ throw new IllegalStateException(
+ String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
+ }
+ tables.put(tableSpec, new TableImpl(tableSpec));
+ return tables.get(tableSpec);
+ }
+
+ /**
+ * Get all the {@link InputDescriptor}s to this application
+ *
+ * @return an immutable map of streamId to {@link InputDescriptor}
+ */
+ @Override
+ public Map<String, InputDescriptor> getInputDescriptors() {
+ return Collections.unmodifiableMap(inputDescriptors);
+ }
+
+ /**
+ * Get all the {@link OutputDescriptor}s from this application
+ *
+ * @return an immutable map of streamId to {@link OutputDescriptor}
+ */
+ @Override
+ public Map<String, OutputDescriptor> getOutputDescriptors() {
+ return Collections.unmodifiableMap(outputDescriptors);
+ }
+
+ /**
+ * Get all the broadcast streamIds from this application
+ *
+ * @return an immutable set of streamIds
+ */
+ @Override
+ public Set<String> getBroadcastStreams() {
+ return Collections.unmodifiableSet(broadcastStreams);
+ }
+
+ /**
+ * Get all the {@link TableDescriptor}s in this application
+ *
+ * @return an immutable set of {@link TableDescriptor}s
+ */
+ @Override
+ public Set<TableDescriptor> getTableDescriptors() {
+ return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+ }
+
+ /**
+ * Get all the unique {@link SystemDescriptor}s in this application
+ *
+ * @return an immutable set of {@link SystemDescriptor}s
+ */
+ @Override
+ public Set<SystemDescriptor> getSystemDescriptors() {
+ // We enforce that users must not use different system descriptor instances for the same system name
+ // when getting an input/output stream or setting the default system descriptor
+ return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+ }
+
+ /**
+ * Get the default {@link SystemDescriptor} in this application
+ *
+ * @return the default {@link SystemDescriptor}
+ */
+ @Override
+ public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+ return defaultSystemDescriptorOptional;
+ }
+
+ public OperatorSpecGraph getOperatorSpecGraph() {
+ return new OperatorSpecGraph(this);
+ }
+
+ /**
+ * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+ * jobName-jobId-opCode-(userDefinedId|nextOpNum);
+ *
+ * @param opCode the {@link OpCode} of the next operator
+ * @param userDefinedId the optional user-provided name of the next operator or null
+ * @return the unique ID for the next operator in the graph
+ */
+ public String getNextOpId(OpCode opCode, String userDefinedId) {
+ if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
+ throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
+ }
+
+ String nextOpId = String.format("%s-%s-%s-%s",
+ config.get(JobConfig.JOB_NAME()),
+ config.get(JobConfig.JOB_ID(), "1"),
+ opCode.name().toLowerCase(),
+ StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
+ if (!operatorIds.add(nextOpId)) {
+ throw new SamzaException(
+ String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
+ }
+ nextOpNum++;
+ return nextOpId;
+ }
+
+ /**
+ * Gets the unique ID for the next operator in the graph. The ID is of the following format:
+ * jobName-jobId-opCode-nextOpNum;
+ *
+ * @param opCode the {@link OpCode} of the next operator
+ * @return the unique ID for the next operator in the graph
+ */
+ public String getNextOpId(OpCode opCode) {
+ return getNextOpId(opCode, null);
+ }
+
+ public Map<String, InputOperatorSpec> getInputOperators() {
+ return Collections.unmodifiableMap(inputOperators);
+ }
+
+ public Map<String, OutputStreamImpl> getOutputStreams() {
+ return Collections.unmodifiableMap(outputStreams);
+ }
+
+ public Map<TableSpec, TableImpl> getTables() {
+ return Collections.unmodifiableMap(tables);
+ }
+
+ /**
+ * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
+ * An intermediate {@link MessageStream} is both an output and an input stream.
+ *
+ * @param streamId the id of the stream to be created.
+ * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
+ * is used.
+ * @param isBroadcast whether the stream is a broadcast stream.
+ * @param <M> the type of messages in the intermediate {@link MessageStream}
+ * @return the intermediate {@link MessageStreamImpl}
+ */
+ @VisibleForTesting
+ public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
+ Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
+ "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
+
+ if (serde == null) {
+ LOGGER.info("No serde provided for intermediate stream: " + streamId +
+ ". Key and message serdes configured for the job.default.system will be used.");
+ }
+
+ if (isBroadcast) {
+ broadcastStreams.add(streamId);
+ }
+
+ boolean isKeyed;
+ KV<Serde, Serde> kvSerdes;
+ if (serde == null) { // if no explicit serde available
+ isKeyed = true; // assume keyed stream
+ kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
+ } else {
+ isKeyed = serde instanceof KVSerde;
+ kvSerdes = getKVSerdes(streamId, serde);
+ }
+
+ InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor()
+ .flatMap(SystemDescriptor::getTransformer).orElse(null);
+
+ InputOperatorSpec inputOperatorSpec =
+ OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
+ transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
+ inputOperators.put(streamId, inputOperatorSpec);
+ outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
+ return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
+ }
+
+ private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
+ Serde keySerde, valueSerde;
+
+ if (serde instanceof KVSerde) {
+ keySerde = ((KVSerde) serde).getKeySerde();
+ valueSerde = ((KVSerde) serde).getValueSerde();
+ } else {
+ keySerde = new NoOpSerde();
+ valueSerde = serde;
+ }
+
+ if (keySerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+ ". Keys will not be (de)serialized");
+ }
+ if (valueSerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+ ". Values will not be (de)serialized");
+ }
+
+ return KV.of(keySerde, valueSerde);
+ }
+
+ // check uniqueness of the {@code systemDescriptor} and add if it is unique
+ private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+ Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+ || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+ "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+ systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
new file mode 100644
index 0000000..3597d7c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.task.TaskFactory;
+
+
+/**
+ * This class implements interface {@link TaskApplicationDescriptor}.
+ * <p>
+ * In addition to the common objects for an application defined in {@link ApplicationDescriptorImpl}, this class also includes
+ * the low-level {@link TaskFactory} that creates user-defined task instances, the lists of input/broadcast/output streams,
+ * and the list of {@link TableDescriptor}s used in the application.
+ */
+public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
+ implements TaskApplicationDescriptor {
+
+ private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+ private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+ private final Set<String> broadcastStreams = new HashSet<>();
+ private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+ private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+
+ private TaskFactory taskFactory = null;
+
+ public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
+ super(userApp, config);
+ userApp.describe(this);
+ }
+
+ @Override
+ public void setTaskFactory(TaskFactory factory) {
+ this.taskFactory = factory;
+ }
+
+ @Override
+ public void addInputStream(InputDescriptor inputDescriptor) {
+ // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
+ Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
+ String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+ inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
+ addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+ }
+
+ @Override
+ public void addOutputStream(OutputDescriptor outputDescriptor) {
+ Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
+ String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+ outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
+ addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+ }
+
+ @Override
+ public void addTable(TableDescriptor tableDescriptor) {
+ Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
+ String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
+ tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
+ }
+
+ @Override
+ public Map<String, InputDescriptor> getInputDescriptors() {
+ return Collections.unmodifiableMap(inputDescriptors);
+ }
+
+ @Override
+ public Map<String, OutputDescriptor> getOutputDescriptors() {
+ return Collections.unmodifiableMap(outputDescriptors);
+ }
+
+ @Override
+ public Set<String> getBroadcastStreams() {
+ return Collections.unmodifiableSet(broadcastStreams);
+ }
+
+ @Override
+ public Set<TableDescriptor> getTableDescriptors() {
+ return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+ }
+
+ @Override
+ public Set<SystemDescriptor> getSystemDescriptors() {
+ // We enforce that users must not use different system descriptor instances for the same system name
+ // when getting an input/output stream or setting the default system descriptor
+ return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+ }
+
+ /**
+ * Get the user-defined {@link TaskFactory}
+ * @return the {@link TaskFactory} object
+ */
+ public TaskFactory getTaskFactory() {
+ return taskFactory;
+ }
+
+ private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+ Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
+ || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
+ "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
+ systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
index fe8bc66..5b3636b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -19,34 +19,40 @@
package org.apache.samza.container;
/**
- * A Listener for {@link org.apache.samza.container.SamzaContainer} lifecycle events.
+ * A Listener for {@link SamzaContainer} lifecycle events.
*/
public interface SamzaContainerListener {
/**
- * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to
+ * Method invoked when the {@link SamzaContainer} state is {@link org.apache.samza.SamzaContainerStatus#NOT_STARTED}
+ * and is about to transition to {@link org.apache.samza.SamzaContainerStatus#STARTING} to start the initialization sequence.
+ */
+ void beforeStart();
+
+ /**
+ * Method invoked after the {@link SamzaContainer} has successfully transitioned to
* the {@link org.apache.samza.SamzaContainerStatus#STARTED} state and is about to start the
* {@link org.apache.samza.container.RunLoop}
*/
- void onContainerStart();
+ void afterStart();
/**
- * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has successfully transitioned to
+ * Method invoked after the {@link SamzaContainer} has successfully transitioned to
* {@link org.apache.samza.SamzaContainerStatus#STOPPED} state. Details on state transitions can be found in
* {@link org.apache.samza.SamzaContainerStatus}
* <br>
* <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
* exceptions/errors.
*/
- void onContainerStop();
+ void afterStop();
/**
- * Method invoked when the {@link org.apache.samza.container.SamzaContainer} has transitioned to
+ * Method invoked after the {@link SamzaContainer} has transitioned to
* {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
* {@link org.apache.samza.SamzaContainerStatus}
* <br>
- * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop()}.
+ * <b>Note</b>: {@link #afterFailure(Throwable)} is mutually exclusive to {@link #afterStop()}.
* @param t Throwable that caused the container failure.
*/
- void onContainerFailed(Throwable t);
+ void afterFailure(Throwable t);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index e95ed26..ea892fe 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -47,9 +47,10 @@ import static org.apache.samza.util.StreamUtil.*;
/**
- * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and
+ * The ExecutionPlanner creates the physical execution graph for the {@link OperatorSpecGraph}, and
* the intermediate topics needed for the execution.
*/
+// TODO: ExecutionPlanner needs to be able to generate single node JobGraph for low-level TaskApplication as well (SAMZA-1811)
public class ExecutionPlanner {
private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
@@ -63,7 +64,7 @@ public class ExecutionPlanner {
this.streamManager = streamManager;
}
- public ExecutionPlan plan(OperatorSpecGraph specGraph) throws Exception {
+ public ExecutionPlan plan(OperatorSpecGraph specGraph) {
validateConfig();
// create physical job graph based on stream graph
@@ -91,7 +92,7 @@ public class ExecutionPlanner {
}
/**
- * Create the physical graph from StreamGraph
+ * Create the physical graph from {@link OperatorSpecGraph}
*/
/* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) {
JobGraph jobGraph = new JobGraph(config, specGraph);
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 2f210f2..f49e6db 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -195,12 +195,6 @@ import org.slf4j.LoggerFactory;
}
/**
- * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
- * @param streamSpec spec of the StreamEdge
- * @return stream edge
- */
-
- /**
* Returns the job nodes to be executed in the topological order
* @return unmodifiable list of {@link JobNode}
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
new file mode 100644
index 0000000..a2050e5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.table.TableConfigGenerator;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a temporary helper class to include all common logic to generate {@link JobConfig}s for high- and low-level
+ * applications in {@link org.apache.samza.runtime.LocalApplicationRunner} and {@link org.apache.samza.runtime.RemoteApplicationRunner}.
+ *
+ * TODO: Fix SAMZA-1811 to consolidate this class with {@link ExecutionPlanner}
+ */
+public abstract class JobPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(JobPlanner.class);
+
+ protected final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
+ protected final Config config;
+
+ JobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+ this.appDesc = descriptor;
+ this.config = descriptor.getConfig();
+ }
+
+ public List<JobConfig> prepareJobs() {
+ String appId = new ApplicationConfig(appDesc.getConfig()).getGlobalAppId();
+ if (appDesc instanceof TaskApplicationDescriptorImpl) {
+ return Collections.singletonList(prepareTaskJob((TaskApplicationDescriptorImpl) appDesc));
+ } else if (appDesc instanceof StreamApplicationDescriptorImpl) {
+ try {
+ return prepareStreamJobs((StreamApplicationDescriptorImpl) appDesc);
+ } catch (Exception e) {
+ throw new SamzaException("Failed to generate JobConfig for StreamApplication " + appId, e);
+ }
+ }
+ throw new IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be either TaskApplicationDescriptorImpl or "
+ + "StreamApplicationDescriptorImpl. class %s is not supported", appDesc.getClass().getName()));
+ }
+
+ abstract List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception;
+
+ StreamManager buildAndStartStreamManager(Config config) {
+ StreamManager streamManager = new StreamManager(config);
+ streamManager.start();
+ return streamManager;
+ }
+
+ ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph) {
+ return getExecutionPlan(specGraph, null);
+ }
+
+ /* package private */
+ ExecutionPlan getExecutionPlan(OperatorSpecGraph specGraph, String runId) {
+
+ // update application configs
+ Map<String, String> cfg = new HashMap<>();
+ if (StringUtils.isNoneEmpty(runId)) {
+ cfg.put(ApplicationConfig.APP_RUN_ID, runId);
+ }
+
+ StreamConfig streamConfig = new StreamConfig(config);
+ Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet());
+ inputStreams.removeAll(specGraph.getOutputStreams().keySet());
+ ApplicationConfig.ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded)
+ ? ApplicationConfig.ApplicationMode.BATCH : ApplicationConfig.ApplicationMode.STREAM;
+ cfg.put(ApplicationConfig.APP_MODE, mode.name());
+
+ // merge user-provided configuration with input/output descriptor generated configuration
+ // descriptor generated configuration has higher priority
+ Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(appDesc);
+ cfg.putAll(systemStreamConfigs);
+
+ // adding app.class in the configuration
+ cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
+
+ // create the physical execution plan and merge with overrides. This works for a single-stage job now
+ // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
+ Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg));
+ // creating the StreamManager to get all input/output streams' metadata for planning
+ StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
+ try {
+ ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, streamManager);
+ return planner.plan(specGraph);
+ } finally {
+ streamManager.stop();
+ }
+ }
+
+ /**
+ * Write the execution plan JSON to a file
+ * @param planJson JSON representation of the plan
+ */
+ final void writePlanJsonFile(String planJson) {
+ try {
+ String content = "plan='" + planJson + "'";
+ String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
+ if (planPath != null && !planPath.isEmpty()) {
+ // Write the plan json to plan path
+ File file = new File(planPath + "/plan.json");
+ file.setReadable(true, false);
+ PrintWriter writer = new PrintWriter(file, "UTF-8");
+ writer.println(content);
+ writer.close();
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to write execution plan json to file", e);
+ }
+ }
+
+ // TODO: SAMZA-1814: the following configuration generation still misses serde configuration generation,
+ // side input configuration, broadcast input and task inputs configuration generation for low-level task
+ // applications
+ // helper method to generate a single node job configuration for low level task applications
+ private JobConfig prepareTaskJob(TaskApplicationDescriptorImpl taskAppDesc) {
+ // copy original configure
+ Map<String, String> cfg = new HashMap<>();
+ // expand system and streams configure
+ Map<String, String> systemStreamConfigs = expandSystemStreamConfigs(taskAppDesc);
+ cfg.putAll(systemStreamConfigs);
+ // expand table configure
+ cfg.putAll(expandTableConfigs(cfg, taskAppDesc));
+ // adding app.class in the configuration
+ cfg.put(ApplicationConfig.APP_CLASS, appDesc.getAppClass().getName());
+ // create the physical execution plan and merge with overrides. This works for a single-stage job now
+ // TODO: This should all be consolidated with ExecutionPlanner after fixing SAMZA-1811
+ return new JobConfig(JobNode.mergeJobConfig(config, new MapConfig(cfg)));
+ }
+
+ private Map<String, String> expandSystemStreamConfigs(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ Map<String, String> systemStreamConfigs = new HashMap<>();
+ appDesc.getInputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
+ appDesc.getOutputDescriptors().forEach((key, value) -> systemStreamConfigs.putAll(value.toConfig()));
+ appDesc.getSystemDescriptors().forEach(sd -> systemStreamConfigs.putAll(sd.toConfig()));
+ appDesc.getDefaultSystemDescriptor().ifPresent(dsd ->
+ systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), dsd.getSystemName()));
+ return systemStreamConfigs;
+ }
+
+ private Map<String, String> expandTableConfigs(Map<String, String> originConfig,
+ ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ List<TableSpec> tableSpecs = new ArrayList<>();
+ appDesc.getTableDescriptors().stream().map(td -> ((BaseTableDescriptor) td).getTableSpec())
+ .forEach(spec -> tableSpecs.add(spec));
+ return TableConfigGenerator.generateConfigsForTableSpecs(new MapConfig(originConfig), tableSpecs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
new file mode 100644
index 0000000..7996d6b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Temporarily helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)}
+ * for standalone Samza processors.
+ *
+ * TODO: we need to consolidate this with {@link ExecutionPlanner} after SAMZA-1811.
+ */
+public class LocalJobPlanner extends JobPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class);
+ private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData";
+
+ private final String uid = UUID.randomUUID().toString();;
+
+ public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+ super(descriptor);
+ }
+
+ @Override
+ List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception {
+ // for high-level DAG, generating the plan and job configs
+ // 1. initialize and plan
+ ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph());
+
+ String executionPlanJson = plan.getPlanAsJson();
+ writePlanJsonFile(executionPlanJson);
+ LOG.info("Execution Plan: \n" + executionPlanJson);
+ String planId = String.valueOf(executionPlanJson.hashCode());
+
+ if (plan.getJobConfigs().isEmpty()) {
+ throw new SamzaException("No jobs in the plan.");
+ }
+
+ // 2. create the necessary streams
+ // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
+ // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig
+ // to be used for the whole application
+ JobConfig jobConfig = plan.getJobConfigs().get(0);
+ StreamManager streamManager = null;
+ try {
+ // create the StreamManager to create intermediate streams in the plan
+ streamManager = buildAndStartStreamManager(jobConfig);
+ createStreams(planId, plan.getIntermediateStreams(), streamManager);
+ } finally {
+ if (streamManager != null) {
+ streamManager.stop();
+ }
+ }
+ return plan.getJobConfigs();
+ }
+
+ /**
+ * Create intermediate streams using {@link StreamManager}.
+ * If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
+ * will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
+ * stream creation.
+ * @param planId a unique identifier representing the plan used for coordination purpose
+ * @param intStreams list of intermediate {@link StreamSpec}s
+ * @param streamManager the {@link StreamManager} used to create streams
+ */
+ private void createStreams(String planId, List<StreamSpec> intStreams, StreamManager streamManager) {
+ if (intStreams.isEmpty()) {
+ LOG.info("Set of intermediate streams is empty. Nothing to create.");
+ return;
+ }
+ LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
+ // Move the scope of coordination utils within stream creation to address long idle connection problem.
+ // Refer SAMZA-1385 for more details
+ JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+ String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
+ CoordinationUtils coordinationUtils =
+ jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config);
+ if (coordinationUtils == null) {
+ LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
+ // each application process will try creating the streams, which
+ // requires stream creation to be idempotent
+ streamManager.createStreams(intStreams);
+ return;
+ }
+
+ DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId);
+ try {
+ // check if the processor needs to go through leader election and stream creation
+ if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
+ LOG.info("lock acquired for streams creation by " + uid);
+ streamManager.createStreams(intStreams);
+ lockWithState.unlockAndSet();
+ } else {
+ LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
+ }
+ } catch (TimeoutException e) {
+ String msg = String.format("Processor {} failed to get the lock for stream initialization", uid);
+ throw new SamzaException(msg, e);
+ } finally {
+ coordinationUtils.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
new file mode 100644
index 0000000..254ff97
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Temporary helper class with specific implementation of {@link JobPlanner#prepareStreamJobs(StreamApplicationDescriptorImpl)}
+ * for remote-launched Samza processors (e.g. in YARN).
+ *
+ * TODO: we need to consolidate this class with {@link ExecutionPlanner} after SAMZA-1811.
+ */
+public class RemoteJobPlanner extends JobPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteJobPlanner.class);
+
+ public RemoteJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+ super(descriptor);
+ }
+
+ @Override
+ List<JobConfig> prepareStreamJobs(StreamApplicationDescriptorImpl streamAppDesc) throws Exception {
+ // for high-level DAG, generate the plan and job configs
+ // TODO: run.id needs to be set for standalone: SAMZA-1531
+ // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
+ String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
+ LOG.info("The run id for this run is {}", runId);
+
+ // 1. initialize and plan
+ ExecutionPlan plan = getExecutionPlan(streamAppDesc.getOperatorSpecGraph(), runId);
+ writePlanJsonFile(plan.getPlanAsJson());
+
+ if (plan.getJobConfigs().isEmpty()) {
+ throw new SamzaException("No jobs in the plan.");
+ }
+
+ // 2. create the necessary streams
+ // TODO: this works for single-job applications. For multi-job applications, ExecutionPlan should return an AppConfig
+ // to be used for the whole application
+ JobConfig jobConfig = plan.getJobConfigs().get(0);
+ StreamManager streamManager = null;
+ try {
+ // create the StreamManager to create intermediate streams in the plan
+ streamManager = buildAndStartStreamManager(jobConfig);
+ if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
+ streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
+ }
+ streamManager.createStreams(plan.getIntermediateStreams());
+ } finally {
+ if (streamManager != null) {
+ streamManager.stop();
+ }
+ }
+ return plan.getJobConfigs();
+ }
+
+ private Config getConfigFromPrevRun() {
+ CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
+ consumer.register();
+ consumer.start();
+ consumer.bootstrap();
+ consumer.stop();
+
+ Config cfg = consumer.getConfig();
+ LOG.info("Previous config is: " + cfg.toString());
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 3c1a1dc..5411af3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,10 +19,12 @@
package org.apache.samza.operators;
+import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Collection;
import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
@@ -53,7 +55,7 @@ import org.apache.samza.table.TableSpec;
/**
* The {@link MessageStream} implementation that lets users describe their logical DAG.
- * Users can obtain an instance by calling {@link StreamGraph#getInputStream}.
+ * Users can obtain an instance by calling {@link StreamApplicationDescriptorImpl#getInputStream}.
* <p>
* Each {@link MessageStreamImpl} is associated with a single {@link OperatorSpec} in the DAG and allows
* users to chain further operators on its {@link OperatorSpec}. In other words, a {@link MessageStreamImpl}
@@ -63,54 +65,54 @@ import org.apache.samza.table.TableSpec;
*/
public class MessageStreamImpl<M> implements MessageStream<M> {
/**
- * The {@link StreamGraphSpec} that contains this {@link MessageStreamImpl}
+ * The {@link StreamApplicationDescriptorImpl} that contains this {@link MessageStreamImpl}
*/
- private final StreamGraphSpec graph;
+ private final StreamApplicationDescriptorImpl streamAppDesc;
/**
* The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
*/
private final OperatorSpec operatorSpec;
- public MessageStreamImpl(StreamGraphSpec graph, OperatorSpec<?, M> operatorSpec) {
- this.graph = graph;
+ public MessageStreamImpl(StreamApplicationDescriptorImpl streamAppDesc, OperatorSpec<?, M> operatorSpec) {
+ this.streamAppDesc = streamAppDesc;
this.operatorSpec = operatorSpec;
}
@Override
public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
- String opId = this.graph.getNextOpId(OpCode.MAP);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.MAP);
StreamOperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
- return new MessageStreamImpl<>(this.graph, op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
- String opId = this.graph.getNextOpId(OpCode.FILTER);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.FILTER);
StreamOperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
- return new MessageStreamImpl<>(this.graph, op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
- String opId = this.graph.getNextOpId(OpCode.FLAT_MAP);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.FLAT_MAP);
StreamOperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
- return new MessageStreamImpl<>(this.graph, op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public void sink(SinkFunction<? super M> sinkFn) {
- String opId = this.graph.getNextOpId(OpCode.SINK);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.SINK);
SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, opId);
this.operatorSpec.registerNextOperatorSpec(op);
}
@Override
public void sendTo(OutputStream<M> outputStream) {
- String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
(OutputStreamImpl<M>) outputStream, opId);
this.operatorSpec.registerNextOperatorSpec(op);
@@ -118,10 +120,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String userDefinedId) {
- String opId = this.graph.getNextOpId(OpCode.WINDOW, userDefinedId);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.WINDOW, userDefinedId);
OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, opId);
this.operatorSpec.registerNextOperatorSpec(op);
- return new MessageStreamImpl<>(this.graph, op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
@@ -130,7 +132,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
Duration ttl, String userDefinedId) {
if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
- String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN, userDefinedId);
OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
JoinOperatorSpec<K, M, OM, JM> op =
OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn, keySerde,
@@ -138,35 +140,35 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
this.operatorSpec.registerNextOperatorSpec(op);
otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) op);
- return new MessageStreamImpl<>(this.graph, op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
- String opId = this.graph.getNextOpId(OpCode.JOIN);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.JOIN);
TableSpec tableSpec = ((TableImpl) table).getTableSpec();
StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, opId);
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
- return new MessageStreamImpl<>(this.graph, joinOpSpec);
+ return new MessageStreamImpl<>(this.streamAppDesc, joinOpSpec);
}
@Override
public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
if (otherStreams.isEmpty()) return this;
- String opId = this.graph.getNextOpId(OpCode.MERGE);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.MERGE);
StreamOperatorSpec<M, M> op = OperatorSpecs.createMergeOperatorSpec(opId);
this.operatorSpec.registerNextOperatorSpec(op);
otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(op));
- return new MessageStreamImpl<>(this.graph, op);
+ return new MessageStreamImpl<>(this.streamAppDesc, op);
}
@Override
public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> keyExtractor,
MapFunction<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String userDefinedId) {
- String opId = this.graph.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
- IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opId, serde, false);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.PARTITION_BY, userDefinedId);
+ IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, false);
if (!intermediateStream.isKeyed()) {
// this can only happen when the default serde partitionBy variant is being used
throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
@@ -185,7 +187,7 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public <K, V> void sendTo(Table<KV<K, V>> table) {
- String opId = this.graph.getNextOpId(OpCode.SEND_TO);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.SEND_TO);
SendToTableOperatorSpec<K, V> op =
OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableSpec(), opId);
this.operatorSpec.registerNextOperatorSpec(op);
@@ -193,8 +195,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
@Override
public MessageStream<M> broadcast(Serde<M> serde, String userDefinedId) {
- String opId = this.graph.getNextOpId(OpCode.BROADCAST, userDefinedId);
- IntermediateMessageStreamImpl<M> intermediateStream = this.graph.getIntermediateStream(opId, serde, true);
+ String opId = this.streamAppDesc.getNextOpId(OpCode.BROADCAST, userDefinedId);
+ IntermediateMessageStreamImpl<M> intermediateStream = this.streamAppDesc.getIntermediateStream(opId, serde, true);
BroadcastOperatorSpec<M> broadcastOperatorSpec =
OperatorSpecs.createBroadCastOperatorSpec(intermediateStream.getOutputStream(), opId);
this.operatorSpec.registerNextOperatorSpec(broadcastOperatorSpec);
@@ -210,7 +212,8 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
* Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
* @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
*/
- protected OperatorSpec<?, M> getOperatorSpec() {
+ @VisibleForTesting
+ public OperatorSpec<?, M> getOperatorSpec() {
return this.operatorSpec;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index b6c3dae..b75b1e8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -33,10 +34,11 @@ import org.apache.samza.table.TableSpec;
/**
- * Defines the serialized format of {@link StreamGraphSpec}. This class encapsulates all getter methods to get the {@link OperatorSpec}
- * initialized in the {@link StreamGraphSpec} and constructsthe corresponding serialized instances of {@link OperatorSpec}.
- * The {@link StreamGraphSpec} and {@link OperatorSpec} instances included in this class are considered as immutable and read-only.
- * The instance of {@link OperatorSpecGraph} should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}.
+ * Defines the serialized format of the operator graph in {@link StreamApplicationDescriptorImpl}. This class encapsulates all
+ * getter methods to get the {@link OperatorSpec} initialized in the {@link StreamApplicationDescriptorImpl} and constructs the
+ * corresponding serialized instances of {@link OperatorSpec}. The {@link StreamApplicationDescriptorImpl} and {@link OperatorSpec}
+ * instances included in this class are considered as immutable and read-only. The instance of {@link OperatorSpecGraph}
+ * should only be used in runtime to construct {@link org.apache.samza.task.StreamOperatorTask}.
*/
public class OperatorSpecGraph implements Serializable {
// We use a LHM for deterministic order in initializing and closing operators.
@@ -51,11 +53,11 @@ public class OperatorSpecGraph implements Serializable {
private transient final SerializableSerde<OperatorSpecGraph> opSpecGraphSerde = new SerializableSerde<>();
private transient final byte[] serializedOpSpecGraph;
- OperatorSpecGraph(StreamGraphSpec graphSpec) {
- this.inputOperators = graphSpec.getInputOperators();
- this.outputStreams = graphSpec.getOutputStreams();
- this.broadcastStreams = graphSpec.getBroadcastStreams();
- this.tables = graphSpec.getTables();
+ public OperatorSpecGraph(StreamApplicationDescriptorImpl streamAppDesc) {
+ this.inputOperators = streamAppDesc.getInputOperators();
+ this.outputStreams = streamAppDesc.getOutputStreams();
+ this.broadcastStreams = streamAppDesc.getBroadcastStreams();
+ this.tables = streamAppDesc.getTables();
this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
this.hasWindowOrJoins = checkWindowOrJoins();
this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
@@ -78,7 +80,7 @@ public class OperatorSpecGraph implements Serializable {
}
/**
- * Get all {@link OperatorSpec}s available in this {@link StreamGraphSpec}
+ * Get all {@link OperatorSpec}s available in this {@link StreamApplicationDescriptorImpl}
*
* @return all available {@link OperatorSpec}s
*/
@@ -87,9 +89,9 @@ public class OperatorSpecGraph implements Serializable {
}
/**
- * Returns <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+ * Returns <tt>true</tt> iff this {@link StreamApplicationDescriptorImpl} contains a join or a window operator
*
- * @return <tt>true</tt> iff this {@link StreamGraphSpec} contains a join or a window operator
+ * @return <tt>true</tt> iff this {@link StreamApplicationDescriptorImpl} contains a join or a window operator
*/
public boolean hasWindowOrJoins() {
return hasWindowOrJoins;
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
deleted file mode 100644
index 8eb2425..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class defines:
- * 1) an implementation of {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to
- * create the DAG of transforms.
- * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG
- */
-public class StreamGraphSpec implements StreamGraph {
- private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class);
- private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
-
- // We use a LHM for deterministic order in initializing and closing operators.
- private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
- private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
- private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
- private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
- private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
- private final Set<String> broadcastStreams = new HashSet<>();
- private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
- private final Config config;
-
- /**
- * The 0-based position of the next operator in the graph.
- * Part of the unique ID for each OperatorSpec in the graph.
- * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}.
- */
- private int nextOpNum = 0;
- private final Set<String> operatorIds = new HashSet<>();
- private ContextManager contextManager = null;
- private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
-
- public StreamGraphSpec(Config config) {
- this.config = config;
- }
-
- @Override
- public void setDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
- Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
- String defaultSystemName = defaultSystemDescriptor.getSystemName();
- Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
- "Default system must be set before creating any input or output streams.");
- checkSystemDescriptorUniqueness(defaultSystemDescriptor, defaultSystemName);
- systemDescriptors.put(defaultSystemName, defaultSystemDescriptor);
- this.defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
- }
-
- @Override
- public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
- SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
- Optional<StreamExpander> expander = systemDescriptor.getExpander();
- if (expander.isPresent()) {
- return expander.get().apply(this, inputDescriptor);
- }
-
- String streamId = inputDescriptor.getStreamId();
- Preconditions.checkState(!inputOperators.containsKey(streamId),
- "getInputStream must not be called multiple times with the same streamId: " + streamId);
- Preconditions.checkState(!inputDescriptors.containsKey(streamId),
- "getInputStream must not be called multiple times with the same input descriptor: " + streamId);
- String systemName = systemDescriptor.getSystemName();
- checkSystemDescriptorUniqueness(systemDescriptor, systemName);
-
- Serde serde = inputDescriptor.getSerde();
- KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
- if (outputStreams.containsKey(streamId)) {
- OutputStreamImpl outputStream = outputStreams.get(streamId);
- Serde keySerde = outputStream.getKeySerde();
- Serde valueSerde = outputStream.getValueSerde();
- Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
- String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
- + "stream level, so the same key and message Serde must be used for both.", streamId));
- }
-
- boolean isKeyed = serde instanceof KVSerde;
- InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
- InputOperatorSpec inputOperatorSpec =
- OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
- transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
- inputOperators.put(streamId, inputOperatorSpec);
- inputDescriptors.put(streamId, inputDescriptor);
- systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
- return new MessageStreamImpl(this, inputOperators.get(streamId));
- }
-
- @Override
- public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
- String streamId = outputDescriptor.getStreamId();
- Preconditions.checkState(!outputStreams.containsKey(streamId),
- "getOutputStream must not be called multiple times with the same streamId: " + streamId);
- Preconditions.checkState(!outputDescriptors.containsKey(streamId),
- "getOutputStream must not be called multiple times with the same output descriptor: " + streamId);
- SystemDescriptor systemDescriptor = outputDescriptor.getSystemDescriptor();
- String systemName = systemDescriptor.getSystemName();
- checkSystemDescriptorUniqueness(systemDescriptor, systemName);
-
- Serde serde = outputDescriptor.getSerde();
- KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
- if (inputOperators.containsKey(streamId)) {
- InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
- Serde keySerde = inputOperatorSpec.getKeySerde();
- Serde valueSerde = inputOperatorSpec.getValueSerde();
- Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
- String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
- + "stream level, so the same key and message Serde must be used for both.", streamId));
- }
-
- boolean isKeyed = serde instanceof KVSerde;
- outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
- outputDescriptors.put(streamId, outputDescriptor);
- systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
- return outputStreams.get(streamId);
- }
-
- @Override
- public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
- String tableId = tableDescriptor.getTableId();
- Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
- String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
- TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec();
- if (tables.containsKey(tableSpec)) {
- throw new IllegalStateException(
- String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
- }
- tables.put(tableSpec, new TableImpl(tableSpec));
- return tables.get(tableSpec);
- }
-
- @Override
- public StreamGraph withContextManager(ContextManager contextManager) {
- this.contextManager = contextManager;
- return this;
- }
-
- public ContextManager getContextManager() {
- return this.contextManager;
- }
-
- public OperatorSpecGraph getOperatorSpecGraph() {
- return new OperatorSpecGraph(this);
- }
-
- /**
- * Gets the unique ID for the next operator in the graph. The ID is of the following format:
- * jobName-jobId-opCode-(userDefinedId|nextOpNum);
- *
- * @param opCode the {@link OpCode} of the next operator
- * @param userDefinedId the optional user-provided name of the next operator or null
- * @return the unique ID for the next operator in the graph
- */
- public String getNextOpId(OpCode opCode, String userDefinedId) {
- if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) {
- throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
- }
-
- String nextOpId = String.format("%s-%s-%s-%s",
- config.get(JobConfig.JOB_NAME()),
- config.get(JobConfig.JOB_ID(), "1"),
- opCode.name().toLowerCase(),
- StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
- if (!operatorIds.add(nextOpId)) {
- throw new SamzaException(
- String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
- }
- nextOpNum++;
- return nextOpId;
- }
-
- /**
- * Gets the unique ID for the next operator in the graph. The ID is of the following format:
- * jobName-jobId-opCode-nextOpNum;
- *
- * @param opCode the {@link OpCode} of the next operator
- * @return the unique ID for the next operator in the graph
- */
- public String getNextOpId(OpCode opCode) {
- return getNextOpId(opCode, null);
- }
-
- /**
- * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
- * An intermediate {@link MessageStream} is both an output and an input stream.
- *
- * @param streamId the id of the stream to be created.
- * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
- * is used.
- * @param isBroadcast whether the stream is a broadcast stream.
- * @param <M> the type of messages in the intermediate {@link MessageStream}
- * @return the intermediate {@link MessageStreamImpl}
- */
- @VisibleForTesting
- public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) {
- Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId),
- "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
-
- if (serde == null) {
- LOGGER.info("No serde provided for intermediate stream: " + streamId +
- ". Key and message serdes configured for the job.default.system will be used.");
- }
-
- if (isBroadcast) broadcastStreams.add(streamId);
-
- boolean isKeyed;
- KV<Serde, Serde> kvSerdes;
- if (serde == null) { // if no explicit serde available
- isKeyed = true; // assume keyed stream
- kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs
- } else {
- isKeyed = serde instanceof KVSerde;
- kvSerdes = getKVSerdes(streamId, serde);
- }
-
- InputTransformer transformer = (InputTransformer) defaultSystemDescriptorOptional
- .flatMap(SystemDescriptor::getTransformer).orElse(null);
-
- InputOperatorSpec inputOperatorSpec =
- OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(),
- transformer, isKeyed, this.getNextOpId(OpCode.INPUT, null));
- inputOperators.put(streamId, inputOperatorSpec);
- outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
- return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
- }
-
- Map<String, InputOperatorSpec> getInputOperators() {
- return Collections.unmodifiableMap(inputOperators);
- }
-
- Map<String, OutputStreamImpl> getOutputStreams() {
- return Collections.unmodifiableMap(outputStreams);
- }
-
- Set<String> getBroadcastStreams() {
- return Collections.unmodifiableSet(broadcastStreams);
- }
-
- Map<TableSpec, TableImpl> getTables() {
- return Collections.unmodifiableMap(tables);
- }
-
- public Map<String, InputDescriptor> getInputDescriptors() {
- return Collections.unmodifiableMap(inputDescriptors);
- }
-
- public Map<String, OutputDescriptor> getOutputDescriptors() {
- return Collections.unmodifiableMap(outputDescriptors);
- }
-
- public Set<SystemDescriptor> getSystemDescriptors() {
- // We enforce that users must not use different system descriptor instances for the same system name
- // when getting an input/output stream or setting the default system descriptor
- return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
- }
-
- public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
- return this.defaultSystemDescriptorOptional;
- }
-
- private void checkSystemDescriptorUniqueness(SystemDescriptor systemDescriptor, String systemName) {
- Preconditions.checkState(!systemDescriptors.containsKey(systemName)
- || systemDescriptors.get(systemName) == systemDescriptor,
- "Must not use different system descriptor instances for the same system name: " + systemName);
- }
-
- private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
- Serde keySerde, valueSerde;
-
- if (serde instanceof KVSerde) {
- keySerde = ((KVSerde) serde).getKeySerde();
- valueSerde = ((KVSerde) serde).getValueSerde();
- } else {
- keySerde = new NoOpSerde();
- valueSerde = serde;
- }
-
- if (keySerde instanceof NoOpSerde) {
- LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
- ". Keys will not be (de)serialized");
- }
- if (valueSerde instanceof NoOpSerde) {
- LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
- ". Values will not be (de)serialized");
- }
-
- return KV.of(keySerde, valueSerde);
- }
-}