You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2019/03/06 02:36:59 UTC
[beam] branch master updated: [BEAM-6767] SamzaRunner: Add more
parameters to SamzaPipelineOptions (#7983)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 179813b [BEAM-6767] SamzaRunner: Add more parameters to SamzaPipelineOptions (#7983)
179813b is described below
commit 179813b6959b186411225924e67ba0146ef1743e
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Mar 5 18:36:45 2019 -0800
[BEAM-6767] SamzaRunner: Add more parameters to SamzaPipelineOptions (#7983)
---
.../beam/runners/samza/SamzaPipelineOptions.java | 18 ++++
.../runners/samza/adapter/BoundedSourceSystem.java | 22 ++--
.../samza/adapter/UnboundedSourceSystem.java | 20 +++-
.../runners/samza/runtime/SamzaDoFnRunners.java | 6 +-
.../samza/runtime/SamzaStoreStateInternals.java | 5 +-
.../runners/samza/translation/ConfigBuilder.java | 34 ++++--
.../translation/ParDoBoundMultiTranslator.java | 9 ++
.../samza/translation/ConfigGeneratorTest.java | 116 +++++++++++++++++++++
8 files changed, 207 insertions(+), 23 deletions(-)
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index 65204c4..e5955a6 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -47,6 +47,12 @@ public interface SamzaPipelineOptions extends PipelineOptions {
void setConfigOverride(Map<String, String> configs);
+ @Description("The instance name of the job")
+ @Default.String("1")
+ String getJobInstance();
+
+ void setJobInstance(String instance);
+
@Description("The interval to check for watermarks in milliseconds.")
@Default.Long(1000)
long getWatermarkInterval();
@@ -70,4 +76,16 @@ public interface SamzaPipelineOptions extends PipelineOptions {
int getStoreBatchGetSize();
void setStoreBatchGetSize(int storeBatchGetSize);
+
+ @Description("Enable/disable Beam metrics in Samza Runner")
+ @Default.Boolean(true)
+ Boolean getEnableMetrics();
+
+ void setEnableMetrics(Boolean enableMetrics);
+
+ @Description("The config for state to be durable")
+ @Default.Boolean(false)
+ Boolean getStateDurable();
+
+ void setStateDurable(Boolean stateDurable);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index 78a88fb..dfcea8b 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -180,10 +180,12 @@ public class BoundedSourceSystem {
"Attempted to call start without assigned system stream partitions");
}
- int capacity = pipelineOptions.getSystemBufferSize();
- readerTask =
- new ReaderTask<>(
- readerToSsp, capacity, new FnWithMetricsWrapper(metricsContainer, stepName));
+ final int capacity = pipelineOptions.getSystemBufferSize();
+ final FnWithMetricsWrapper metricsWrapper =
+ pipelineOptions.getEnableMetrics()
+ ? new FnWithMetricsWrapper(metricsContainer, stepName)
+ : null;
+ readerTask = new ReaderTask<>(readerToSsp, capacity, metricsWrapper);
final Thread thread =
new Thread(readerTask, "bounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
thread.start();
@@ -255,7 +257,7 @@ public class BoundedSourceSystem {
final Set<BoundedReader<T>> availableReaders = new HashSet<>(readerToSsp.keySet());
try {
for (BoundedReader<T> reader : readerToSsp.keySet()) {
- boolean hasData = metricsWrapper.wrap(reader::start);
+ boolean hasData = invoke(reader::start);
if (hasData) {
enqueueMessage(reader);
} else {
@@ -269,7 +271,7 @@ public class BoundedSourceSystem {
final Iterator<BoundedReader<T>> iter = availableReaders.iterator();
while (iter.hasNext()) {
final BoundedReader<T> reader = iter.next();
- final boolean hasData = metricsWrapper.wrap(reader::advance);
+ final boolean hasData = invoke(reader::advance);
if (hasData) {
enqueueMessage(reader);
} else {
@@ -297,6 +299,14 @@ public class BoundedSourceSystem {
}
}
+ private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> fn) throws Exception {
+ if (metricsWrapper != null) {
+ return metricsWrapper.wrap(fn);
+ } else {
+ return fn.get();
+ }
+ }
+
private void enqueueMessage(BoundedReader<T> reader) throws InterruptedException {
final T value = reader.getCurrent();
final WindowedValue<T> windowedValue =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index 88affe4..a3f958b 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -192,13 +192,17 @@ public class UnboundedSourceSystem {
"Attempted to call start without assigned system stream partitions");
}
+ final FnWithMetricsWrapper metricsWrapper =
+ pipelineOptions.getEnableMetrics()
+ ? new FnWithMetricsWrapper(metricsContainer, stepName)
+ : null;
readerTask =
new ReaderTask<>(
readerToSsp,
checkpointMarkCoder,
pipelineOptions.getSystemBufferSize(),
pipelineOptions.getWatermarkInterval(),
- new FnWithMetricsWrapper(metricsContainer, stepName));
+ metricsWrapper);
final Thread thread =
new Thread(readerTask, "unbounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
thread.start();
@@ -286,7 +290,7 @@ public class UnboundedSourceSystem {
try {
for (UnboundedReader reader : readers) {
- final boolean hasData = metricsWrapper.wrap(reader::start);
+ final boolean hasData = invoke(reader::start);
if (hasData) {
available.acquire();
enqueueMessage(reader);
@@ -296,7 +300,7 @@ public class UnboundedSourceSystem {
while (running) {
boolean elementAvailable = false;
for (UnboundedReader reader : readers) {
- final boolean hasData = metricsWrapper.wrap(reader::advance);
+ final boolean hasData = invoke(reader::advance);
if (hasData) {
while (!available.tryAcquire(
1,
@@ -342,6 +346,14 @@ public class UnboundedSourceSystem {
}
}
+ private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> fn) throws Exception {
+ if (metricsWrapper != null) {
+ return metricsWrapper.wrap(fn);
+ } else {
+ return fn.get();
+ }
+ }
+
private void updateWatermark() throws InterruptedException {
final long time = System.currentTimeMillis();
if (time - lastWatermarkTime > watermarkInterval) {
@@ -420,7 +432,7 @@ public class UnboundedSourceSystem {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@SuppressWarnings("unchecked")
final CheckpointMarkT checkpointMark =
- (CheckpointMarkT) metricsWrapper.wrap(reader::getCheckpointMark);
+ (CheckpointMarkT) invoke(reader::getCheckpointMark);
checkpointMarkCoder.encode(checkpointMark, baos);
return Base64.getEncoder().encodeToString(baos.toByteArray());
} catch (Exception e) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 096ae7b..e50fe30 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -106,8 +106,10 @@ public class SamzaDoFnRunners {
doFnSchemaInformation);
final DoFnRunner<InT, FnOutT> doFnRunnerWithMetrics =
- DoFnRunnerWithMetrics.wrap(
- underlyingRunner, executionContext.getMetricsContainer(), stepName);
+ pipelineOptions.getEnableMetrics()
+ ? DoFnRunnerWithMetrics.wrap(
+ underlyingRunner, executionContext.getMetricsContainer(), stepName)
+ : underlyingRunner;
if (keyedInternals != null) {
final DoFnRunner<InT, FnOutT> statefulDoFnRunner =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 94e319c..0c545fe 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -833,7 +833,10 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
final Instant currentValue = readInternal();
final Instant combinedValue =
currentValue == null ? value : timestampCombiner.combine(currentValue, value);
- writeInternal(combinedValue);
+
+ if (!combinedValue.equals(currentValue)) {
+ writeInternal(combinedValue);
+ }
}
@Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index 478357e..a3578ed 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -66,12 +66,14 @@ public class ConfigBuilder {
public Config build() {
try {
- config.putAll(systemStoreConfig());
+ config.putAll(systemStoreConfig(options));
// apply user configs
config.putAll(createUserConfig(options));
config.put(JobConfig.JOB_NAME(), options.getJobName());
+ config.put(JobConfig.JOB_ID(), options.getJobInstance());
+
config.put(
"beamPipelineOptions",
Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
@@ -151,14 +153,26 @@ public class ConfigBuilder {
.build();
}
- private static Map<String, String> systemStoreConfig() {
- return ImmutableMap.<String, String>builder()
- .put(
- "stores.beamStore.factory",
- "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
- .put("stores.beamStore.key.serde", "byteSerde")
- .put("stores.beamStore.msg.serde", "byteSerde")
- .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName())
- .build();
+ private static Map<String, String> systemStoreConfig(SamzaPipelineOptions options) {
+ ImmutableMap.Builder<String, String> configBuilder =
+ ImmutableMap.<String, String>builder()
+ .put(
+ "stores.beamStore.factory",
+ "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
+ .put("stores.beamStore.key.serde", "byteSerde")
+ .put("stores.beamStore.msg.serde", "byteSerde")
+ .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName());
+
+ if (options.getStateDurable()) {
+ configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
+ configBuilder.put("job.host-affinity.enabled", "true");
+ }
+
+ return configBuilder.build();
+ }
+
+ static String getChangelogTopic(SamzaPipelineOptions options, String storeName) {
+ return String.format(
+ "%s-%s-%s-changelog", options.getJobName(), options.getJobInstance(), storeName);
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index 6e79546..be38fec 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -32,6 +32,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpAdapter;
@@ -272,6 +273,8 @@ class ParDoBoundMultiTranslator<InT, OutT>
ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, ConfigContext ctx) {
final Map<String, String> config = new HashMap<>();
final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ final SamzaPipelineOptions options = ctx.getPipelineOptions();
+
if (signature.usesState()) {
// set up user state configs
for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
@@ -281,6 +284,12 @@ class ParDoBoundMultiTranslator<InT, OutT>
"org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
config.put("stores." + storeId + ".key.serde", "byteSerde");
config.put("stores." + storeId + ".msg.serde", "byteSerde");
+
+ if (options.getStateDurable()) {
+ config.put(
+ "stores." + storeId + ".changelog",
+ ConfigBuilder.getChangelogTopic(options, storeId));
+ }
}
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
new file mode 100644
index 0000000..881fc76
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.junit.Test;
+
+/** Test config generations for {@link org.apache.beam.runners.samza.SamzaRunner}. */
+public class ConfigGeneratorTest {
+
+ @Test
+ public void testBeamStoreConfig() {
+ SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ options.setJobName("TestStoreConfig");
+ options.setRunner(SamzaRunner.class);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(Create.of(1, 2, 3)).apply(Sum.integersGlobally());
+
+ pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final ConfigBuilder configBuilder = new ConfigBuilder(options);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+ final Config config = configBuilder.build();
+
+ assertEquals(
+ RocksDbKeyValueStorageEngineFactory.class.getName(),
+ config.get("stores.beamStore.factory"));
+ assertEquals("byteSerde", config.get("stores.beamStore.key.serde"));
+ assertEquals("byteSerde", config.get("stores.beamStore.msg.serde"));
+ assertNull(config.get("stores.beamStore.changelog"));
+
+ options.setStateDurable(true);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+ final Config config2 = configBuilder.build();
+ assertEquals(
+ "TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog"));
+ }
+
+ @Test
+ public void testUserStoreConfig() {
+ SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+ options.setJobName("TestStoreConfig");
+ options.setRunner(SamzaRunner.class);
+
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline
+ .apply(
+ Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())))
+ .apply(
+ ParDo.of(
+ new DoFn<KV<String, String>, Void>() {
+ private static final String testState = "testState";
+
+ @StateId(testState)
+ private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context, @StateId(testState) ValueState<Integer> state) {}
+ }));
+
+ final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+ final ConfigBuilder configBuilder = new ConfigBuilder(options);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+ final Config config = configBuilder.build();
+
+ assertEquals(
+ RocksDbKeyValueStorageEngineFactory.class.getName(),
+ config.get("stores.testState.factory"));
+ assertEquals("byteSerde", config.get("stores.testState.key.serde"));
+ assertEquals("byteSerde", config.get("stores.testState.msg.serde"));
+ assertNull(config.get("stores.testState.changelog"));
+
+ options.setStateDurable(true);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+ final Config config2 = configBuilder.build();
+ assertEquals(
+ "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog"));
+ }
+}