You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2019/03/29 16:15:43 UTC
[beam] branch master updated: [BEAM-6932] SamzaRunner: migrate to
use new Samza 1.1.0 liraries (#8163)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 91d59e7 [BEAM-6932] SamzaRunner: migrate to use new Samza 1.1.0 liraries (#8163)
91d59e7 is described below
commit 91d59e78ffec3771a1d646c4e320fff571393829
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Fri Mar 29 09:15:30 2019 -0700
[BEAM-6932] SamzaRunner: migrate to use new Samza 1.1.0 liraries (#8163)
---
runners/samza/build.gradle | 2 +-
.../beam/runners/samza/SamzaExecutionContext.java | 122 ++++++++++++++-
.../beam/runners/samza/SamzaPipelineResult.java | 4 +-
.../org/apache/beam/runners/samza/SamzaRunner.java | 167 ++-------------------
.../samza/container/BeamContainerRunner.java | 45 +++++-
.../apache/beam/runners/samza/runtime/DoFnOp.java | 14 +-
.../beam/runners/samza/runtime/GroupByKeyOp.java | 15 +-
.../org/apache/beam/runners/samza/runtime/Op.java | 8 +-
.../beam/runners/samza/runtime/OpAdapter.java | 24 +--
.../beam/runners/samza/runtime/OpMessage.java | 5 +-
.../samza/runtime/SamzaDoFnInvokerRegistrar.java | 4 +-
.../runners/samza/runtime/SamzaDoFnRunners.java | 12 +-
.../samza/runtime/SamzaStoreStateInternals.java | 2 +-
.../samza/runtime/SamzaTimerInternalsFactory.java | 12 +-
.../runners/samza/translation/ConfigBuilder.java | 7 +-
.../translation/FlattenPCollectionsTranslator.java | 14 +-
.../samza/translation/GroupByKeyTranslator.java | 27 +++-
.../samza/translation/ImpulseTranslator.java | 30 ++--
.../translation/ParDoBoundMultiTranslator.java | 18 ++-
.../translation/PortableTranslationContext.java | 88 ++++++++---
.../runners/samza/translation/ReadTranslator.java | 60 ++++----
.../translation/SamzaPublishViewTranslator.java | 13 +-
.../samza/translation/TranslationContext.java | 133 ++++++++++------
.../samza/adapter/BoundedSourceSystemTest.java | 7 +-
.../samza/adapter/UnboundedSourceSystemTest.java | 15 +-
.../runtime/SamzaStoreStateInternalsTest.java | 8 +-
.../runtime/SamzaTimerInternalsFactoryTest.java | 13 +-
27 files changed, 513 insertions(+), 356 deletions(-)
diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index faa4d51..98a5686 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -34,7 +34,7 @@ configurations {
validatesRunner
}
-def samza_version = "0.14.1"
+def samza_version = "1.1.0"
dependencies {
shadow library.java.vendored_guava_20_0
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
index 162c7d8..af65135 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
@@ -17,13 +17,57 @@
*/
package org.apache.beam.runners.samza;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
+import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Runtime context for the Samza runner. */
-public class SamzaExecutionContext {
+public class SamzaExecutionContext implements ApplicationContainerContext {
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutionContext.class);
+ private static final String SAMZA_WORKER_ID = "samza_py_worker_id";
+
+ private final SamzaPipelineOptions options;
private SamzaMetricsContainer metricsContainer;
private JobBundleFactory jobBundleFactory;
+ private GrpcFnServer<FnApiControlClientPoolService> fnControlServer;
+ private GrpcFnServer<GrpcDataService> fnDataServer;
+ private GrpcFnServer<GrpcStateService> fnStateServer;
+ private ControlClientPool controlClientPool;
+ private IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
+ public SamzaExecutionContext(SamzaPipelineOptions options) {
+ this.options = options;
+ }
+
+ public SamzaPipelineOptions getPipelineOptions() {
+ return options;
+ }
public SamzaMetricsContainer getMetricsContainer() {
return this.metricsContainer;
@@ -40,4 +84,80 @@ public class SamzaExecutionContext {
void setJobBundleFactory(JobBundleFactory jobBundleFactory) {
this.jobBundleFactory = jobBundleFactory;
}
+
+ @Override
+ public void start() {
+ checkState(getJobBundleFactory() == null, "jobBundleFactory has been created!");
+
+ if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+ try {
+ controlClientPool = MapControlClientPool.create();
+ final ExecutorService dataExecutor = Executors.newCachedThreadPool();
+
+ fnControlServer =
+ GrpcFnServer.allocatePortAndCreateFor(
+ FnApiControlClientPoolService.offeringClientsToPool(
+ controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
+ ServerFactory.createWithPortSupplier(
+ () -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
+
+ fnDataServer =
+ GrpcFnServer.allocatePortAndCreateFor(
+ GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()),
+ ServerFactory.createDefault());
+
+ fnStateServer =
+ GrpcFnServer.allocatePortAndCreateFor(
+ GrpcStateService.create(), ServerFactory.createDefault());
+
+ final long waitTimeoutMs =
+ SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+ final InstructionRequestHandler instructionHandler =
+ controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
+ final EnvironmentFactory environmentFactory =
+ environment -> RemoteEnvironment.forHandler(environment, instructionHandler);
+ // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+ jobBundleFactory =
+ SingleEnvironmentInstanceJobBundleFactory.create(
+ environmentFactory, fnDataServer, fnStateServer, idGenerator);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Running samza in Beam portable mode but failed to create job bundle factory", e);
+ }
+
+ setJobBundleFactory(jobBundleFactory);
+ }
+ }
+
+ @Override
+ public void stop() {
+ closeFnServer(fnControlServer);
+ fnControlServer = null;
+ closeFnServer(fnDataServer);
+ fnDataServer = null;
+ closeFnServer(fnStateServer);
+ fnStateServer = null;
+ }
+
+ private void closeFnServer(GrpcFnServer<?> fnServer) {
+ try (AutoCloseable closer = fnServer) {
+ // do nothing
+ } catch (Exception e) {
+ LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e);
+ }
+ }
+
+ /** The factory to return this {@link SamzaExecutionContext}. */
+ public class Factory implements ApplicationContainerContextFactory<SamzaExecutionContext> {
+
+ @Override
+ public SamzaExecutionContext create(
+ ExternalContext externalContext, JobContext jobContext, ContainerContext containerContext) {
+
+ final MetricsRegistryMap metricsRegistry =
+ (MetricsRegistryMap) containerContext.getContainerMetricsRegistry();
+ SamzaExecutionContext.this.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
+ return SamzaExecutionContext.this;
+ }
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index 9badbc3..db71dc9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -53,7 +53,7 @@ public class SamzaPipelineResult implements PipelineResult {
@Override
public State cancel() {
- runner.kill(app);
+ runner.kill();
return waitUntilFinish();
}
@@ -88,7 +88,7 @@ public class SamzaPipelineResult implements PipelineResult {
}
private StateInfo getStateInfo() {
- final ApplicationStatus status = runner.status(app);
+ final ApplicationStatus status = runner.status();
switch (status.getStatusCode()) {
case New:
return new StateInfo(State.STOPPED);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index bd4148b..6876162 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -17,24 +17,8 @@
*/
package org.apache.beam.runners.samza;
-import java.time.Duration;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.control.ControlClientPool;
-import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
-import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
-import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
-import org.apache.beam.runners.fnexecution.data.GrpcDataService;
-import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
-import org.apache.beam.runners.fnexecution.state.GrpcStateService;
-import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.runners.samza.translation.PViewToIdMapper;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
@@ -45,19 +29,12 @@ import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.PipelineDotRenderer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PValue;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.runtime.ApplicationRunners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,15 +44,6 @@ import org.slf4j.LoggerFactory;
*/
public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class);
- // temporarily hardcode the worker id before we start supporting multiple workers
- private static final String SAMZA_WORKER_ID = "samza_py_worker_id";
-
- private GrpcFnServer<FnApiControlClientPoolService> fnControlServer;
- private GrpcFnServer<GrpcDataService> fnDataServer;
- private GrpcFnServer<GrpcStateService> fnStateServer;
- private ControlClientPool controlClientPool;
- private JobBundleFactory jobBundleFactory;
- private ExecutorService dataExecutor;
public static SamzaRunner fromOptions(PipelineOptions opts) {
final SamzaPipelineOptions samzaOptions = SamzaPipelineOptionsValidator.validate(opts);
@@ -88,118 +56,20 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
this.options = options;
}
- private static void closeAutoClosable(AutoCloseable closeable) {
- try (AutoCloseable closer = closeable) {
- // do nothing
- } catch (Exception e) {
- LOG.error(
- "Failed to close {}. Ignore since this is shutdown process...",
- closeable.getClass().getSimpleName(),
- e);
- }
- }
-
- private void setUpContextManager(
- StreamGraph streamGraph, SamzaExecutionContext executionContext) {
- streamGraph.withContextManager(
- new ContextManager() {
- @Override
- public void init(Config config, TaskContext context) {
- if (executionContext.getMetricsContainer() == null) {
- final MetricsRegistryMap metricsRegistry =
- (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry;
- executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
- }
-
- if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
- if (jobBundleFactory == null) {
- try {
- final long waitTimeoutMs =
- SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
- final InstructionRequestHandler instructionHandler =
- controlClientPool
- .getSource()
- .take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
- final EnvironmentFactory environmentFactory =
- environment -> RemoteEnvironment.forHandler(environment, instructionHandler);
- // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
- jobBundleFactory =
- SingleEnvironmentInstanceJobBundleFactory.create(
- environmentFactory,
- fnDataServer,
- fnStateServer,
- IdGenerators.incrementingLongs());
- } catch (Exception e) {
- throw new RuntimeException(
- "Running samza in Beam portable mode but failed to create job bundle factory",
- e);
- }
- executionContext.setJobBundleFactory(jobBundleFactory);
- }
- }
-
- context.setUserContext(executionContext);
- }
-
- @Override
- public void close() {
- closeAutoClosable(fnControlServer);
- fnControlServer = null;
- closeAutoClosable(fnDataServer);
- fnDataServer = null;
- closeAutoClosable(fnStateServer);
- fnStateServer = null;
- if (dataExecutor != null) {
- dataExecutor.shutdown();
- dataExecutor = null;
- }
- controlClientPool = null;
- closeAutoClosable(jobBundleFactory);
- jobBundleFactory = null;
- }
- });
- }
-
- private void setUpFnApiServer() {
- controlClientPool = MapControlClientPool.create();
- dataExecutor = Executors.newCachedThreadPool();
- try {
- fnControlServer =
- GrpcFnServer.allocatePortAndCreateFor(
- FnApiControlClientPoolService.offeringClientsToPool(
- controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
- ServerFactory.createWithPortSupplier(
- () -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
-
- fnDataServer =
- GrpcFnServer.allocatePortAndCreateFor(
- GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()),
- ServerFactory.createDefault());
-
- fnStateServer =
- GrpcFnServer.allocatePortAndCreateFor(
- GrpcStateService.create(), ServerFactory.createDefault());
- } catch (Exception e) {
- LOG.error("Failed to set up fn api servers", e);
- throw new RuntimeException(e);
- }
- }
-
SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
- final SamzaExecutionContext executionContext = new SamzaExecutionContext();
- // TODO: this will be moved to setUpContextManager in samza 1.0 migration
- setUpFnApiServer();
ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder);
- final ApplicationRunner runner = ApplicationRunner.fromConfig(configBuilder.build());
+ final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
final StreamApplication app =
- (streamGraph, config) -> {
- setUpContextManager(streamGraph, executionContext);
+ appDescriptor -> {
+ appDescriptor.withApplicationContainerContextFactory(executionContext.new Factory());
SamzaPortablePipelineTranslator.translate(
- pipeline, new PortableTranslationContext(streamGraph, options));
+ pipeline, new PortableTranslationContext(appDescriptor, options));
};
+ final ApplicationRunner runner =
+ ApplicationRunners.getApplicationRunner(app, configBuilder.build());
final SamzaPipelineResult result = new SamzaPipelineResult(app, runner, executionContext);
- runner.run(app);
+ runner.run();
return result;
}
@@ -217,28 +87,23 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
LOG.debug("Post-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline));
}
- // Add a dummy source for use in special cases (TestStream, empty flatten)
- final PValue dummySource = pipeline.apply("Dummy Input Source", Create.of("dummy"));
final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
- final ApplicationRunner runner = ApplicationRunner.fromConfig(configBuilder.build());
-
- final SamzaExecutionContext executionContext = new SamzaExecutionContext();
+ final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
final StreamApplication app =
- new StreamApplication() {
- @Override
- public void init(StreamGraph streamGraph, Config config) {
- setUpContextManager(streamGraph, executionContext);
- SamzaPipelineTranslator.translate(
- pipeline, new TranslationContext(streamGraph, idMap, options, dummySource));
- }
+ appDescriptor -> {
+ appDescriptor.withApplicationContainerContextFactory(executionContext.new Factory());
+ SamzaPipelineTranslator.translate(
+ pipeline, new TranslationContext(appDescriptor, idMap, options));
};
+ final ApplicationRunner runner =
+ ApplicationRunners.getApplicationRunner(app, configBuilder.build());
final SamzaPipelineResult result = new SamzaPipelineResult(app, runner, executionContext);
- runner.run(app);
+ runner.run();
return result;
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
index c1e0c99..be7aa4f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
@@ -18,34 +18,67 @@
package org.apache.beam.runners.samza.container;
import java.time.Duration;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.context.ExternalContext;
import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.runtime.LocalContainerRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ContainerLaunchUtil;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Runs the beam Yarn container, using the static global job model. */
-public class BeamContainerRunner extends LocalContainerRunner {
+public class BeamContainerRunner implements ApplicationRunner {
private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgFactory.class);
- public BeamContainerRunner(Config config) {
- super(ContainerCfgFactory.jobModel, System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+ private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
+
+ public BeamContainerRunner(SamzaApplication app, Config config) {
+ this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
+ }
+
+ @Override
+ public void run(ExternalContext externalContext) {
+ Thread.setDefaultUncaughtExceptionHandler(
+ new SamzaUncaughtExceptionHandler(
+ () -> {
+ LOG.info("Exiting process now.");
+ System.exit(1);
+ }));
+
+ ContainerLaunchUtil.run(
+ appDesc,
+ System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()),
+ ContainerCfgFactory.jobModel);
+ }
+
+ @Override
+ public void kill() {
+ // Do nothing. Yarn will kill the container.
}
@Override
- public ApplicationStatus status(StreamApplication app) {
+ public ApplicationStatus status() {
+ // The container is running during the life span of this object.
return ApplicationStatus.Running;
}
@Override
public void waitForFinish() {
+ // Container run is synchronous
+ // so calling waitForFinish() after run() should return immediately
LOG.info("Container has stopped");
}
@Override
public boolean waitForFinish(Duration timeout) {
+ // Container run is synchronous
+ // so calling waitForFinish() after run() should return immediately
LOG.info("Container has stopped");
return true;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 663f0d0..eb1d997 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -54,8 +54,8 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,8 +138,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
@Override
public void open(
Config config,
- TaskContext context,
- TimerRegistry<KeyedTimerData<Void>> timerRegistry,
+ Context context,
+ Scheduler<KeyedTimerData<Void>> timerRegistry,
OpEmitter<OutT> emitter) {
this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -154,12 +154,12 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- null, context, pipelineOptions, signature, mainOutputTag);
+ null, context.getTaskContext(), pipelineOptions, signature, mainOutputTag);
this.timerInternalsFactory =
SamzaTimerInternalsFactory.createTimerInternalFactory(
keyCoder,
- (TimerRegistry) timerRegistry,
+ (Scheduler) timerRegistry,
getTimerStateId(signature),
nonKeyedStateInternalsFactory,
windowingStrategy,
@@ -170,7 +170,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
if (isPortable) {
SamzaExecutionContext samzaExecutionContext =
- (SamzaExecutionContext) context.getUserContext();
+ (SamzaExecutionContext) context.getApplicationContainerContext();
ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
stageBundleFactory = samzaExecutionContext.getJobBundleFactory().forStage(executableStage);
this.fnRunner =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index 76f7a32..f593ecc 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -46,8 +46,8 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,8 +97,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
@Override
public void open(
Config config,
- TaskContext context,
- TimerRegistry<KeyedTimerData<K>> timerRegistry,
+ Context context,
+ Scheduler<KeyedTimerData<K>> timerRegistry,
OpEmitter<KV<K, OutputT>> emitter) {
this.pipelineOptions =
Base64Serializer.deserializeUnchecked(
@@ -108,7 +108,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- null, context, pipelineOptions, null, mainOutputTag);
+ null, context.getTaskContext(), pipelineOptions, null, mainOutputTag);
final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter);
@@ -117,7 +117,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
mainOutputTag.getId(),
Collections.singletonMap(
SamzaStoreStateInternals.BEAM_STORE,
- SamzaStoreStateInternals.getBeamStore(context)),
+ SamzaStoreStateInternals.getBeamStore(context.getTaskContext())),
keyCoder,
pipelineOptions.getStoreBatchGetSize());
@@ -170,7 +170,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
windowingStrategy,
DoFnSchemaInformation.create());
- final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getUserContext();
+ final SamzaExecutionContext executionContext =
+ (SamzaExecutionContext) context.getApplicationContainerContext();
this.fnRunner =
DoFnRunnerWithMetrics.wrap(doFnRunner, executionContext.getMetricsContainer(), stepName);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
index 793ce6a..cbf5c46 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.samza.runtime;
import java.io.Serializable;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
/**
@@ -41,8 +41,8 @@ public interface Op<InT, OutT, K> extends Serializable {
*/
default void open(
Config config,
- TaskContext taskContext,
- TimerRegistry<KeyedTimerData<K>> timerRegistry,
+ Context context,
+ Scheduler<KeyedTimerData<K>> timerRegistry,
OpEmitter<OutT> emitter) {}
void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index 49601ff..8b958db 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -24,11 +24,11 @@ import java.util.List;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.task.TaskContext;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class OpAdapter<InT, OutT, K>
implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
WatermarkFunction<OpMessage<OutT>>,
- TimerFunction<KeyedTimerData<K>, OpMessage<OutT>>,
+ ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
Serializable {
private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
@@ -46,7 +46,7 @@ public class OpAdapter<InT, OutT, K>
private transient Instant outputWatermark;
private transient OpEmitter<OutT> emitter;
private transient Config config;
- private transient TaskContext taskContext;
+ private transient Context context;
public static <InT, OutT, K> FlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
Op<InT, OutT, K> op) {
@@ -58,18 +58,18 @@ public class OpAdapter<InT, OutT, K>
}
@Override
- public final void init(Config config, TaskContext context) {
+ public final void init(Context context) {
this.outputList = new ArrayList<>();
this.emitter = new OpEmitterImpl();
- this.config = config;
- this.taskContext = context;
+ this.config = context.getJobContext().getConfig();
+ this.context = context;
}
@Override
- public final void registerTimer(TimerRegistry<KeyedTimerData<K>> timerRegistry) {
- assert taskContext != null;
+ public final void schedule(Scheduler<KeyedTimerData<K>> timerRegistry) {
+ assert context != null;
- op.open(config, taskContext, timerRegistry, emitter);
+ op.open(config, context, timerRegistry, emitter);
}
@Override
@@ -124,7 +124,7 @@ public class OpAdapter<InT, OutT, K>
}
@Override
- public Collection<OpMessage<OutT>> onTimer(KeyedTimerData<K> keyedTimerData, long time) {
+ public Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long time) {
assert outputList.isEmpty();
try {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java
index ab28ac5..34254b9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java
@@ -21,8 +21,9 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;
/**
- * Actual message type used in Samza {@link org.apache.samza.operators.StreamGraph}. It contains
- * either an element of main inputs or the collection results from a view (used as side input).
+ * Actual message type used in Samza {@link org.apache.samza.application.StreamApplication}. It
+ * contains either an element of main inputs or the collection results from a view (used as side
+ * input).
*/
public class OpMessage<T> {
/**
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java
index 5ac9c11..ebb01b0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java
@@ -20,14 +20,14 @@ package org.apache.beam.runners.samza.runtime;
import java.util.Map;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
/** A registrar for Samza DoFnInvoker. */
public interface SamzaDoFnInvokerRegistrar {
/** Returns the invoker for a {@link DoFn}. */
<InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
- DoFn<InputT, OutputT> fn, TaskContext context);
+ DoFn<InputT, OutputT> fn, Context context);
/** Returns the configs for a {@link DoFn}. */
<InputT, OutputT> Map<String, String> configFor(DoFn<InputT, OutputT> fn);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 0dd93ee..692b664 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
import org.joda.time.Instant;
/** A factory for Samza runner translator to create underlying DoFnRunner used in {@link DoFnOp}. */
@@ -61,7 +61,7 @@ public class SamzaDoFnRunners {
DoFn<InT, FnOutT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
String stepName,
- TaskContext taskContext,
+ Context context,
TupleTag<FnOutT> mainOutputTag,
SideInputHandler sideInputHandler,
SamzaTimerInternalsFactory<?> timerInternalsFactory,
@@ -77,10 +77,10 @@ public class SamzaDoFnRunners {
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final SamzaStoreStateInternals.Factory<?> stateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- keyCoder, taskContext, pipelineOptions, signature, mainOutputTag);
+ keyCoder, context.getTaskContext(), pipelineOptions, signature, mainOutputTag);
final SamzaExecutionContext executionContext =
- (SamzaExecutionContext) taskContext.getUserContext();
+ (SamzaExecutionContext) context.getApplicationContainerContext();
if (signature.usesState()) {
keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory);
stateInternals = keyedInternals.stateInternals();
@@ -163,10 +163,10 @@ public class SamzaDoFnRunners {
StageBundleFactory stageBundleFactory,
TupleTag<FnOutT> mainOutputTag,
Map<String, TupleTag<?>> idToTupleTagMap,
- TaskContext taskContext,
+ Context context,
String stepName) {
final SamzaExecutionContext executionContext =
- (SamzaExecutionContext) taskContext.getUserContext();
+ (SamzaExecutionContext) context.getApplicationContainerContext();
final DoFnRunner<InT, FnOutT> sdkHarnessDoFnRunner =
new SdkHarnessDoFnRunner<>(
outputManager, stageBundleFactory, mainOutputTag, idToTupleTagMap);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 9597ae9..620aae2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -64,10 +64,10 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
+import org.apache.samza.context.TaskContext;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
import org.joda.time.Instant;
/** {@link StateInternals} that uses Samza local {@link KeyValueStore} to manage state. */
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 5e16acd..d014a1e 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
private final Coder<K> keyCoder;
- private final TimerRegistry<KeyedTimerData<K>> timerRegistry;
+ private final Scheduler<KeyedTimerData<K>> timerRegistry;
private final int timerBufferSize;
private final SamzaTimerState state;
@@ -61,7 +61,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
private SamzaTimerInternalsFactory(
Coder<K> keyCoder,
- TimerRegistry<KeyedTimerData<K>> timerRegistry,
+ Scheduler<KeyedTimerData<K>> timerRegistry,
int timerBufferSize,
String timerStateId,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
@@ -75,7 +75,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(
Coder<K> keyCoder,
- TimerRegistry<KeyedTimerData<K>> timerRegistry,
+ Scheduler<KeyedTimerData<K>> timerRegistry,
String timerStateId,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
WindowingStrategy<?, BoundedWindow> windowingStrategy,
@@ -197,7 +197,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
break;
case PROCESSING_TIME:
- timerRegistry.register(keyedTimerData, timerData.getTimestamp().getMillis());
+ timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
break;
default:
@@ -347,7 +347,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
// since the iterator will reach to the end, it will be closed automatically
while (iter.hasNext()) {
final KeyedTimerData<K> keyedTimerData = iter.next();
- timerRegistry.register(
+ timerRegistry.schedule(
keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index a3578ed..f4c0b8f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -42,7 +42,6 @@ import org.apache.samza.config.factories.PropertiesConfigFactory;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.ByteSerdeFactory;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
/** Builder class to generate configs for BEAM samza runner during runtime. */
@@ -78,6 +77,9 @@ public class ConfigBuilder {
"beamPipelineOptions",
Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
+ // TODO: remove after we sort out Samza task wrapper
+ config.put("samza.li.task.wrapper.enabled", "false");
+
return new MapConfig(config);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -137,9 +139,6 @@ public class ConfigBuilder {
.put(
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
PassthroughJobCoordinatorFactory.class.getName())
- .put(
- JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY,
- PassthroughCoordinationUtilsFactory.class.getName())
.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName())
.put(TaskConfig.COMMIT_MS(), "-1")
.put("processor.id", "1")
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
index 61042b0..e72e601 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
@@ -44,6 +44,11 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
@Override
public void translate(
Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
+ doTranslate(transform, node, ctx);
+ }
+
+ private static <T> void doTranslate(
+ Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
final PCollection<T> output = ctx.getOutput(transform);
final List<MessageStream<OpMessage<T>>> inputStreams = new ArrayList<>();
@@ -79,6 +84,13 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
PipelineNode.PTransformNode transform,
QueryablePipeline pipeline,
PortableTranslationContext ctx) {
+ doTranslatePortable(transform, pipeline, ctx);
+ }
+
+ private static <T> void doTranslatePortable(
+ PipelineNode.PTransformNode transform,
+ QueryablePipeline pipeline,
+ PortableTranslationContext ctx) {
final List<MessageStream<OpMessage<T>>> inputStreams = ctx.getAllInputMessageStreams(transform);
final String outputId = ctx.getOutputId(transform);
@@ -91,7 +103,7 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
}
// Merge multiple input streams into one, as this is what "flatten" is meant to do
- private MessageStream<OpMessage<T>> mergeInputStreams(
+ private static <T> MessageStream<OpMessage<T>> mergeInputStreams(
List<MessageStream<OpMessage<T>>> inputStreams) {
if (inputStreams.size() == 1) {
return Iterables.getOnlyElement(inputStreams);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 4599f53..13c6f11 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -59,6 +59,13 @@ class GroupByKeyTranslator<K, InputT, OutputT>
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
TransformHierarchy.Node node,
TranslationContext ctx) {
+ doTranslate(transform, node, ctx);
+ }
+
+ private static <K, InputT, OutputT> void doTranslate(
+ PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+ TransformHierarchy.Node node,
+ TranslationContext ctx) {
final PCollection<KV<K, InputT>> input = ctx.getInput(transform);
final PCollection<KV<K, OutputT>> output = ctx.getOutput(transform);
@@ -96,6 +103,13 @@ class GroupByKeyTranslator<K, InputT, OutputT>
PipelineNode.PTransformNode transform,
QueryablePipeline pipeline,
PortableTranslationContext ctx) {
+ doTranslatePortable(transform, pipeline, ctx);
+ }
+
+ private static <K, InputT, OutputT> void doTranslatePortable(
+ PipelineNode.PTransformNode transform,
+ QueryablePipeline pipeline,
+ PortableTranslationContext ctx) {
final MessageStream<OpMessage<KV<K, InputT>>> inputStream =
ctx.getOneInputMessageStream(transform);
final boolean needRepartition = ctx.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
@@ -134,7 +148,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
}
- private MessageStream<OpMessage<KV<K, OutputT>>> doTranslateGBK(
+ private static <K, InputT, OutputT> MessageStream<OpMessage<KV<K, OutputT>>> doTranslateGBK(
MessageStream<OpMessage<KV<K, InputT>>> inputStream,
boolean needRepartition,
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn,
@@ -185,10 +199,11 @@ class GroupByKeyTranslator<K, InputT, OutputT>
}
@SuppressWarnings("unchecked")
- private SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
- PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
- Pipeline pipeline,
- KvCoder<K, InputT> kvInputCoder) {
+ private static <K, InputT, OutputT>
+ SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
+ PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+ Pipeline pipeline,
+ KvCoder<K, InputT> kvInputCoder) {
if (transform instanceof GroupByKey) {
return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
SystemReduceFn.buffering(kvInputCoder.getValueCoder());
@@ -203,7 +218,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
}
}
- private boolean needRepartition(TransformHierarchy.Node node, TranslationContext ctx) {
+ private static boolean needRepartition(TransformHierarchy.Node node, TranslationContext ctx) {
if (ctx.getPipelineOptions().getMaxSourceParallelism() == 1) {
// Only one task will be created, no need for repartition
return false;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
index 197350b..baa51ab 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
@@ -17,38 +17,32 @@
*/
package org.apache.beam.runners.samza.translation;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
/**
* Translate {@link org.apache.beam.sdk.transforms.Impulse} to a samza message stream produced by
* {@link
* org.apache.beam.runners.samza.translation.SamzaImpulseSystemFactory.SamzaImpulseSystemConsumer}.
*/
-public class ImpulseTranslator implements TransformTranslator, TransformConfigGenerator {
+public class ImpulseTranslator implements TransformTranslator {
@Override
public void translatePortable(
PipelineNode.PTransformNode transform,
QueryablePipeline pipeline,
PortableTranslationContext ctx) {
- final String outputId = ctx.getOutputId(transform);
- ctx.registerInputMessageStream(outputId);
- }
-
- @Override
- public Map<String, String> createPortableConfig(PipelineNode.PTransformNode transform) {
- final String id = Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values());
-
- final Map<String, String> config = new HashMap<>();
- final String systemPrefix = "systems." + id;
- final String streamPrefix = "streams." + id;
- config.put(systemPrefix + ".samza.factory", SamzaImpulseSystemFactory.class.getName());
- config.put(streamPrefix + ".samza.system", id);
+ final String outputId = ctx.getOutputId(transform);
+ final GenericSystemDescriptor systemDescriptor =
+ new GenericSystemDescriptor(outputId, SamzaImpulseSystemFactory.class.getName());
+ final GenericInputDescriptor<KV<?, OpMessage<byte[]>>> inputDescriptor =
+ systemDescriptor.getInputDescriptor(outputId, new NoOpSerde<>());
- return config;
+ ctx.registerInputMessageStream(outputId, inputDescriptor);
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index 25eddb7..e4651c4 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -81,6 +81,14 @@ class ParDoBoundMultiTranslator<InT, OutT>
ParDo.MultiOutput<InT, OutT> transform,
TransformHierarchy.Node node,
TranslationContext ctx) {
+ doTranslate(transform, node, ctx);
+ }
+
+ // static for serializing anonymous functions
+ private static <InT, OutT> void doTranslate(
+ ParDo.MultiOutput<InT, OutT> transform,
+ TransformHierarchy.Node node,
+ TranslationContext ctx) {
final PCollection<? extends InT> input = ctx.getInput(transform);
final Map<TupleTag<?>, Coder<?>> outputCoders =
ctx.getCurrentTransform().getOutputs().entrySet().stream()
@@ -179,6 +187,14 @@ class ParDoBoundMultiTranslator<InT, OutT>
PipelineNode.PTransformNode transform,
QueryablePipeline pipeline,
PortableTranslationContext ctx) {
+ doTranslatePortable(transform, pipeline, ctx);
+ }
+
+ // static for serializing anonymous functions
+ private static <InT, OutT> void doTranslatePortable(
+ PipelineNode.PTransformNode transform,
+ QueryablePipeline pipeline,
+ PortableTranslationContext ctx) {
Map<String, String> outputs = transform.getTransform().getOutputsMap();
final RunnerApi.ExecutableStagePayload stagePayload;
@@ -296,7 +312,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
return config;
}
- private class SideInputWatermarkFn
+ private static class SideInputWatermarkFn<InT>
implements FlatMapFunction<OpMessage<InT>, OpMessage<InT>>,
WatermarkFunction<OpMessage<InT>> {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
index 6aa608b..93a28b7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.samza.translation;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -24,13 +25,24 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
/**
* Helper that keeps the mapping from BEAM PCollection id to Samza {@link MessageStream}. It also
@@ -39,13 +51,14 @@ import org.apache.samza.operators.StreamGraph;
*/
public class PortableTranslationContext {
private final Map<String, MessageStream<?>> messsageStreams = new HashMap<>();
- private final StreamGraph streamGraph;
+ private final StreamApplicationDescriptor appDescriptor;
private final SamzaPipelineOptions options;
private int topologicalId;
private final Set<String> registeredInputStreams = new HashSet<>();
- public PortableTranslationContext(StreamGraph streamGraph, SamzaPipelineOptions options) {
- this.streamGraph = streamGraph;
+ public PortableTranslationContext(
+ StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options) {
+ this.appDescriptor = appDescriptor;
this.options = options;
}
@@ -93,33 +106,64 @@ public class PortableTranslationContext {
messsageStreams.put(id, stream);
}
- /** Register an input stream, using the PCollection id as the config id. */
- public void registerInputMessageStream(String id) {
- registerInputMessageStreamWithStreamId(id, id);
+ /** Get output stream by output descriptor. */
+ public <OutT> OutputStream<OutT> getOutputStream(OutputDescriptor<OutT, ?> outputDescriptor) {
+ return appDescriptor.getOutputStream(outputDescriptor);
}
- /** Get output stream by stream id. */
- public <T> OutputStream<T> getOutputStreamById(String outputStreamId) {
- return streamGraph.getOutputStream(outputStreamId);
- }
-
- /**
- * Register an input stream with certain config id.
- *
- * @param id id of the PCollection in the input/output of PTransform
- * @param streamId samza stream id which user can use to customize the stream level config
- */
- public <T> void registerInputMessageStreamWithStreamId(String id, String streamId) {
+ /** Register an input stream with certain config id. */
+ public <T> void registerInputMessageStream(
+ String id, InputDescriptor<KV<?, OpMessage<T>>, ?> inputDescriptor) {
// we want to register it with the Samza graph only once per i/o stream
+ final String streamId = inputDescriptor.getStreamId();
if (registeredInputStreams.contains(streamId)) {
return;
}
final MessageStream<OpMessage<T>> stream =
- streamGraph
- .<org.apache.samza.operators.KV<?, OpMessage<T>>>getInputStream(streamId)
- .map(org.apache.samza.operators.KV::getValue);
+ appDescriptor.getInputStream(inputDescriptor).map(org.apache.samza.operators.KV::getValue);
registerMessageStream(id, stream);
registeredInputStreams.add(streamId);
}
+
+ public WindowedValue.WindowedValueCoder instantiateCoder(
+ String collectionId, RunnerApi.Components components) {
+ PipelineNode.PCollectionNode collectionNode =
+ PipelineNode.pCollection(collectionId, components.getPcollectionsOrThrow(collectionId));
+ try {
+ return (WindowedValue.WindowedValueCoder)
+ WireCoders.instantiateRunnerWireCoder(collectionNode, components);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public WindowingStrategy<?, BoundedWindow> getPortableWindowStrategy(
+ PipelineNode.PTransformNode transform, QueryablePipeline pipeline) {
+ String inputId = Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
+ RehydratedComponents rehydratedComponents =
+ RehydratedComponents.forComponents(pipeline.getComponents());
+
+ RunnerApi.WindowingStrategy windowingStrategyProto =
+ pipeline
+ .getComponents()
+ .getWindowingStrategiesOrThrow(
+ pipeline.getComponents().getPcollectionsOrThrow(inputId).getWindowingStrategyId());
+
+ WindowingStrategy<?, ?> windowingStrategy;
+ try {
+ windowingStrategy =
+ WindowingStrategyTranslation.fromProto(windowingStrategyProto, rehydratedComponents);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to hydrate GroupByKey windowing strategy %s.", windowingStrategyProto),
+ e);
+ }
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> ret =
+ (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+ return ret;
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
index cf8fa18..c5dcd9d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.runners.samza.translation;
-import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
+import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
@@ -32,14 +32,19 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
/**
* Translates {@link org.apache.beam.sdk.io.Read} to Samza input {@link
* org.apache.samza.operators.MessageStream}.
*/
-public class ReadTranslator<T>
- implements TransformTranslator<PTransform<PBegin, PCollection<T>>>,
- TransformConfigGenerator<PTransform<PBegin, PCollection<T>>> {
+public class ReadTranslator<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
@Override
public void translate(
@@ -47,39 +52,40 @@ public class ReadTranslator<T>
TransformHierarchy.Node node,
TranslationContext ctx) {
final PCollection<T> output = ctx.getOutput(transform);
- ctx.registerInputMessageStream(output);
- }
-
- @Override
- public Map<String, String> createConfig(
- PTransform<PBegin, PCollection<T>> transform,
- TransformHierarchy.Node node,
- ConfigContext ctx) {
- final String id = ctx.getOutputId(node);
- final PCollection<T> output = ctx.getOutput(transform);
final Coder<WindowedValue<T>> coder = SamzaCoders.of(output);
final Source<?> source =
transform instanceof Read.Unbounded
? ((Read.Unbounded) transform).getSource()
: ((Read.Bounded) transform).getSource();
+ final String id = ctx.getIdForPValue(output);
- final Map<String, String> config = new HashMap<>();
- final String systemPrefix = "systems." + id;
- final String streamPrefix = "streams." + id;
-
- config.put(systemPrefix + ".source", Base64Serializer.serializeUnchecked(source));
- config.put(systemPrefix + ".coder", Base64Serializer.serializeUnchecked(coder));
- config.put(systemPrefix + ".stepName", node.getFullName());
+ // Create system descriptor
+ final GenericSystemDescriptor systemDescriptor;
+ if (source instanceof BoundedSource) {
+ systemDescriptor =
+ new GenericSystemDescriptor(id, BoundedSourceSystem.Factory.class.getName());
+ } else {
+ systemDescriptor =
+ new GenericSystemDescriptor(id, UnboundedSourceSystem.Factory.class.getName());
+ }
- config.put(streamPrefix + ".samza.system", id);
+ final Map<String, String> systemConfig =
+ ImmutableMap.of(
+ "source", Base64Serializer.serializeUnchecked(source),
+ "coder", Base64Serializer.serializeUnchecked(coder),
+ "stepName", node.getFullName());
+ systemDescriptor.withSystemConfigs(systemConfig);
+ // Create stream descriptor
+ @SuppressWarnings("unchecked")
+ final Serde<KV<?, OpMessage<T>>> kvSerde =
+ (Serde) KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+ final GenericInputDescriptor<KV<?, OpMessage<T>>> inputDescriptor =
+ systemDescriptor.getInputDescriptor(id, kvSerde);
if (source instanceof BoundedSource) {
- config.put(streamPrefix + ".samza.bounded", "true");
- config.put(systemPrefix + ".samza.factory", BoundedSourceSystem.Factory.class.getName());
- } else {
- config.put(systemPrefix + ".samza.factory", UnboundedSourceSystem.Factory.class.getName());
+ inputDescriptor.isBounded();
}
- return config;
+ ctx.registerInputMessageStream(output, inputDescriptor);
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java
index 74c157a..76612e8 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java
@@ -29,11 +29,20 @@ import org.apache.samza.operators.MessageStream;
/** Translates {@link SamzaPublishView} to a view {@link MessageStream} as side input. */
class SamzaPublishViewTranslator<ElemT, ViewT>
implements TransformTranslator<SamzaPublishView<ElemT, ViewT>> {
+
@Override
public void translate(
SamzaPublishView<ElemT, ViewT> transform,
TransformHierarchy.Node node,
TranslationContext ctx) {
+ doTranslate(transform, node, ctx);
+ }
+
+ private static <ElemT, ViewT> void doTranslate(
+ SamzaPublishView<ElemT, ViewT> transform,
+ TransformHierarchy.Node node,
+ TranslationContext ctx) {
+
final PCollection<List<ElemT>> input = ctx.getInput(transform);
final MessageStream<OpMessage<Iterable<ElemT>>> inputStream = ctx.getMessageStream(input);
@SuppressWarnings("unchecked")
@@ -51,9 +60,9 @@ class SamzaPublishViewTranslator<ElemT, ViewT>
: elementStream.broadcast(
SamzaCoders.toSerde(elementCoder), "view-" + ctx.getCurrentTopologicalId());
+ final String viewId = ctx.getViewId(transform.getView());
final MessageStream<OpMessage<Iterable<ElemT>>> outputStream =
- broadcastStream.map(
- element -> OpMessage.ofSideInput(ctx.getViewId(transform.getView()), element));
+ broadcastStream.map(element -> OpMessage.ofSideInput(viewId, element));
ctx.registerViewStream(transform.getView(), outputStream);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index 27d16c8..4120ef0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -19,22 +19,43 @@ package org.apache.beam.runners.samza.translation;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.WatermarkMessage;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,57 +66,61 @@ import org.slf4j.LoggerFactory;
*/
public class TranslationContext {
public static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
- private final StreamGraph streamGraph;
+ private final StreamApplicationDescriptor appDescriptor;
private final Map<PValue, MessageStream<?>> messsageStreams = new HashMap<>();
private final Map<PCollectionView<?>, MessageStream<?>> viewStreams = new HashMap<>();
private final Map<PValue, String> idMap;
private final Map<String, MessageStream> registeredInputStreams = new HashMap<>();
private final Map<String, Table> registeredTables = new HashMap<>();
- private final PValue dummySource;
private final SamzaPipelineOptions options;
private AppliedPTransform<?, ?, ?> currentTransform;
private int topologicalId;
public TranslationContext(
- StreamGraph streamGraph,
+ StreamApplicationDescriptor appDescriptor,
Map<PValue, String> idMap,
- SamzaPipelineOptions options,
- PValue dummySource) {
- this.streamGraph = streamGraph;
+ SamzaPipelineOptions options) {
+ this.appDescriptor = appDescriptor;
this.idMap = idMap;
this.options = options;
- this.dummySource = dummySource;
}
- public <OutT> void registerInputMessageStream(PValue pvalue) {
- // We only register dummySource if it is actually used (determined by a call to getDummyStream).
- if (!pvalue.equals(dummySource)) {
- doRegisterInputMessageStream(pvalue, getIdForPValue(pvalue));
- }
- }
+ public <OutT> void registerInputMessageStream(
+ PValue pvalue,
+ InputDescriptor<org.apache.samza.operators.KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
+ // we want to register it with the Samza graph only once per i/o stream
+ final String streamId = inputDescriptor.getStreamId();
+ if (registeredInputStreams.containsKey(streamId)) {
+ MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
+ LOG.info(
+ String.format(
+ "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
+ streamId, messageStream, pvalue));
+ registerMessageStream(pvalue, messageStream);
- public <OutT> void registerInputMessageStreamById(PValue pvalue, String streamId) {
- // We only register dummySource if it is actually used (determined by a call to getDummyStream).
- if (!pvalue.equals(dummySource)) {
- doRegisterInputMessageStream(pvalue, streamId);
+ return;
}
+ @SuppressWarnings("unchecked")
+ final MessageStream<OpMessage<OutT>> typedStream =
+ getValueStream(appDescriptor.getInputStream(inputDescriptor));
+
+ registerMessageStream(pvalue, typedStream);
+ registeredInputStreams.put(streamId, typedStream);
}
public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
if (messsageStreams.containsKey(pvalue)) {
throw new IllegalArgumentException("Stream already registered for pvalue: " + pvalue);
}
-
messsageStreams.put(pvalue, stream);
}
+ // Add a dummy stream for use in special cases (TestStream, empty flatten)
public MessageStream<OpMessage<String>> getDummyStream() {
- if (!messsageStreams.containsKey(dummySource)) {
- doRegisterInputMessageStream(dummySource, getIdForPValue(dummySource));
- }
-
- return getMessageStream(dummySource);
+ InputDescriptor<OpMessage<String>, ?> dummyInput =
+ createDummyStreamDescriptor(UUID.randomUUID().toString());
+ return appDescriptor.getInputStream(dummyInput);
}
public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
@@ -177,43 +202,53 @@ public class TranslationContext {
return this.options;
}
- public <OutT> OutputStream<OutT> getOutputStreamById(String outputStreamId) {
- return streamGraph.getOutputStream(outputStreamId);
+ public <OutT> OutputStream<OutT> getOutputStream(OutputDescriptor<OutT, ?> outputDescriptor) {
+ return appDescriptor.getOutputStream(outputDescriptor);
}
@SuppressWarnings("unchecked")
public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
return registeredTables.computeIfAbsent(
- tableDesc.getTableId(), id -> streamGraph.getTable(tableDesc));
+ tableDesc.getTableId(), id -> appDescriptor.getTable(tableDesc));
}
- private <OutT> void doRegisterInputMessageStream(PValue pvalue, String streamId) {
- // we want to register it with the Samza graph only once per i/o stream
- if (registeredInputStreams.containsKey(streamId)) {
- MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
- LOG.info(
- String.format(
- "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
- streamId, messageStream, pvalue));
- registerMessageStream(pvalue, messageStream);
-
- return;
- }
- @SuppressWarnings("unchecked")
- final MessageStream<OpMessage<OutT>> typedStream =
- streamGraph
- .<org.apache.samza.operators.KV<?, OpMessage<OutT>>>getInputStream(streamId)
- .map(org.apache.samza.operators.KV::getValue);
-
- registerMessageStream(pvalue, typedStream);
- registeredInputStreams.put(streamId, typedStream);
+ private static <T> MessageStream<T> getValueStream(
+ MessageStream<org.apache.samza.operators.KV<?, T>> input) {
+ return input.map(org.apache.samza.operators.KV::getValue);
}
- private String getIdForPValue(PValue pvalue) {
+ public String getIdForPValue(PValue pvalue) {
final String id = idMap.get(pvalue);
if (id == null) {
throw new IllegalArgumentException("No id mapping for value: " + pvalue);
}
return id;
}
+
+ /** The dummy stream created will only be used in Beam tests. */
+ private static InputDescriptor<OpMessage<String>, ?> createDummyStreamDescriptor(String id) {
+ final GenericSystemDescriptor dummySystem =
+ new GenericSystemDescriptor(id, InMemorySystemFactory.class.getName());
+ final GenericInputDescriptor<OpMessage<String>> dummyInput =
+ dummySystem.getInputDescriptor(id, new NoOpSerde<>());
+ dummyInput.withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
+ final Config config = new MapConfig(dummyInput.toConfig(), dummySystem.toConfig());
+ final SystemFactory factory = new InMemorySystemFactory();
+ final StreamSpec dummyStreamSpec = new StreamSpec(id, id, id, 1);
+ factory.getAdmin(id, config).createStream(dummyStreamSpec);
+
+ final SystemProducer producer = factory.getProducer(id, config, null);
+ final SystemStream sysStream = new SystemStream(id, id);
+ final Consumer<Object> sendFn =
+ (msg) -> {
+ producer.send(id, new OutgoingMessageEnvelope(sysStream, 0, null, msg));
+ };
+ final WindowedValue<String> windowedValue =
+ WindowedValue.timestampedValueInGlobalWindow("dummy", new Instant());
+
+ sendFn.accept(OpMessage.ofElement(windowedValue));
+ sendFn.accept(new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+ sendFn.accept(new EndOfStreamMessage(null));
+ return dummyInput;
+ }
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
index bb8066c..964f0fe 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
@@ -55,6 +55,7 @@ public class BoundedSourceSystemTest {
// A reasonable time to wait to get all messages from the bounded source assuming no blocking.
private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
+ private static final String NULL_STRING = null;
@Test
public void testConsumerStartStop() throws IOException, InterruptedException {
@@ -215,9 +216,9 @@ public class BoundedSourceSystemTest {
final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source, 3);
- consumer.register(ssp(0), null);
- consumer.register(ssp(1), null);
- consumer.register(ssp(2), null);
+ consumer.register(ssp(0), NULL_STRING);
+ consumer.register(ssp(1), NULL_STRING);
+ consumer.register(ssp(2), NULL_STRING);
consumer.start();
final Set<String> offsets = new HashSet<>();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
index 9b7b217..53e7ed7 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
@@ -56,6 +56,7 @@ public class UnboundedSourceSystemTest {
// A reasonable time to wait to get all messages from the source assuming no blocking.
private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
private static final long DEFAULT_WATERMARK_TIMEOUT_MILLIS = 1000;
+ private static final String NULL_STRING = null;
private static final SystemStreamPartition DEFAULT_SSP =
new SystemStreamPartition("default-system", "default-system", new Partition(0));
@@ -86,7 +87,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Arrays.asList(
@@ -110,7 +111,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Arrays.asList(
@@ -133,7 +134,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Arrays.asList(
@@ -161,7 +162,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Arrays.asList(
@@ -193,7 +194,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
// consume to the first watermark
assertEquals(
@@ -223,7 +224,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
expectWrappedException(
exception,
@@ -271,7 +272,7 @@ public class UnboundedSourceSystemTest {
final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
createConsumer(source);
- consumer.register(DEFAULT_SSP, null);
+ consumer.register(DEFAULT_SSP, NULL_STRING);
consumer.start();
assertEquals(
Collections.singletonList(createElementMessage(DEFAULT_SSP, offset(0), "before", now)),
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
index ca085c2..f75a758 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
@@ -56,8 +56,10 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -194,7 +196,9 @@ public class SamzaStoreStateInternalsTest implements Serializable {
File storeDir,
MetricsRegistry registry,
SystemStreamPartition changeLogSystemStreamPartition,
- SamzaContainerContext containerContext) {
+ JobContext jobContext,
+ ContainerContext containerContext,
+ StorageEngineFactory.StoreMode readWrite) {
KeyValueStoreMetrics metrics = new KeyValueStoreMetrics(storeName, registry);
return new TestStore(metrics);
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 6e8b9be..25ef315 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -39,11 +39,12 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.apache.samza.storage.kv.RocksDbKeyValueStore;
-import org.apache.samza.task.TaskContext;
import org.joda.time.Instant;
import org.junit.Test;
import org.rocksdb.FlushOptions;
@@ -73,7 +74,7 @@ public class SamzaTimerInternalsFactoryTest {
private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(
SamzaPipelineOptions pipelineOptions, RocksDbKeyValueStore store) {
final TaskContext context = mock(TaskContext.class);
- when(context.getStore(anyString())).thenReturn(store);
+ when(context.getStore(anyString())).thenReturn((KeyValueStore) store);
final TupleTag<?> mainOutputTag = new TupleTag<>("output");
return SamzaStoreStateInternals.createStateInternalFactory(
@@ -81,7 +82,7 @@ public class SamzaTimerInternalsFactoryTest {
}
private static SamzaTimerInternalsFactory<String> createTimerInternalsFactory(
- TimerRegistry<KeyedTimerData<String>> timerRegistry,
+ Scheduler<KeyedTimerData<String>> timerRegistry,
String timerStateId,
SamzaPipelineOptions pipelineOptions,
RocksDbKeyValueStore store) {
@@ -98,11 +99,11 @@ public class SamzaTimerInternalsFactoryTest {
pipelineOptions);
}
- private static class TestTimerRegistry implements TimerRegistry<KeyedTimerData<String>> {
+ private static class TestTimerRegistry implements Scheduler<KeyedTimerData<String>> {
private final List<KeyedTimerData<String>> timers = new ArrayList<>();
@Override
- public void register(KeyedTimerData<String> key, long timestamp) {
+ public void schedule(KeyedTimerData<String> key, long timestamp) {
timers.add(key);
}