You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2019/03/06 02:36:59 UTC

[beam] branch master updated: [BEAM-6767] SamzaRunner: Add more parameters to SamzaPipelineOptions (#7983)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 179813b  [BEAM-6767] SamzaRunner: Add more parameters to SamzaPipelineOptions (#7983)
179813b is described below

commit 179813b6959b186411225924e67ba0146ef1743e
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Tue Mar 5 18:36:45 2019 -0800

    [BEAM-6767] SamzaRunner: Add more parameters to SamzaPipelineOptions (#7983)
---
 .../beam/runners/samza/SamzaPipelineOptions.java   |  18 ++++
 .../runners/samza/adapter/BoundedSourceSystem.java |  22 ++--
 .../samza/adapter/UnboundedSourceSystem.java       |  20 +++-
 .../runners/samza/runtime/SamzaDoFnRunners.java    |   6 +-
 .../samza/runtime/SamzaStoreStateInternals.java    |   5 +-
 .../runners/samza/translation/ConfigBuilder.java   |  34 ++++--
 .../translation/ParDoBoundMultiTranslator.java     |   9 ++
 .../samza/translation/ConfigGeneratorTest.java     | 116 +++++++++++++++++++++
 8 files changed, 207 insertions(+), 23 deletions(-)

diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index 65204c4..e5955a6 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -47,6 +47,12 @@ public interface SamzaPipelineOptions extends PipelineOptions {
 
   void setConfigOverride(Map<String, String> configs);
 
+  @Description("The instance name of the job")
+  @Default.String("1")
+  String getJobInstance();
+
+  void setJobInstance(String instance);
+
   @Description("The interval to check for watermarks in milliseconds.")
   @Default.Long(1000)
   long getWatermarkInterval();
@@ -70,4 +76,16 @@ public interface SamzaPipelineOptions extends PipelineOptions {
   int getStoreBatchGetSize();
 
   void setStoreBatchGetSize(int storeBatchGetSize);
+
+  @Description("Enable/disable Beam metrics in Samza Runner")
+  @Default.Boolean(true)
+  Boolean getEnableMetrics();
+
+  void setEnableMetrics(Boolean enableMetrics);
+
+  @Description("The config for state to be durable")
+  @Default.Boolean(false)
+  Boolean getStateDurable();
+
+  void setStateDurable(Boolean stateDurable);
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index 78a88fb..dfcea8b 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -180,10 +180,12 @@ public class BoundedSourceSystem {
             "Attempted to call start without assigned system stream partitions");
       }
 
-      int capacity = pipelineOptions.getSystemBufferSize();
-      readerTask =
-          new ReaderTask<>(
-              readerToSsp, capacity, new FnWithMetricsWrapper(metricsContainer, stepName));
+      final int capacity = pipelineOptions.getSystemBufferSize();
+      final FnWithMetricsWrapper metricsWrapper =
+          pipelineOptions.getEnableMetrics()
+              ? new FnWithMetricsWrapper(metricsContainer, stepName)
+              : null;
+      readerTask = new ReaderTask<>(readerToSsp, capacity, metricsWrapper);
       final Thread thread =
           new Thread(readerTask, "bounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
       thread.start();
@@ -255,7 +257,7 @@ public class BoundedSourceSystem {
         final Set<BoundedReader<T>> availableReaders = new HashSet<>(readerToSsp.keySet());
         try {
           for (BoundedReader<T> reader : readerToSsp.keySet()) {
-            boolean hasData = metricsWrapper.wrap(reader::start);
+            boolean hasData = invoke(reader::start);
             if (hasData) {
               enqueueMessage(reader);
             } else {
@@ -269,7 +271,7 @@ public class BoundedSourceSystem {
             final Iterator<BoundedReader<T>> iter = availableReaders.iterator();
             while (iter.hasNext()) {
               final BoundedReader<T> reader = iter.next();
-              final boolean hasData = metricsWrapper.wrap(reader::advance);
+              final boolean hasData = invoke(reader::advance);
               if (hasData) {
                 enqueueMessage(reader);
               } else {
@@ -297,6 +299,14 @@ public class BoundedSourceSystem {
         }
       }
 
+      private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> fn) throws Exception {
+        if (metricsWrapper != null) {
+          return metricsWrapper.wrap(fn);
+        } else {
+          return fn.get();
+        }
+      }
+
       private void enqueueMessage(BoundedReader<T> reader) throws InterruptedException {
         final T value = reader.getCurrent();
         final WindowedValue<T> windowedValue =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index 88affe4..a3f958b 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -192,13 +192,17 @@ public class UnboundedSourceSystem {
             "Attempted to call start without assigned system stream partitions");
       }
 
+      final FnWithMetricsWrapper metricsWrapper =
+          pipelineOptions.getEnableMetrics()
+              ? new FnWithMetricsWrapper(metricsContainer, stepName)
+              : null;
       readerTask =
           new ReaderTask<>(
               readerToSsp,
               checkpointMarkCoder,
               pipelineOptions.getSystemBufferSize(),
               pipelineOptions.getWatermarkInterval(),
-              new FnWithMetricsWrapper(metricsContainer, stepName));
+              metricsWrapper);
       final Thread thread =
           new Thread(readerTask, "unbounded-source-system-consumer-" + NEXT_ID.getAndIncrement());
       thread.start();
@@ -286,7 +290,7 @@ public class UnboundedSourceSystem {
 
         try {
           for (UnboundedReader reader : readers) {
-            final boolean hasData = metricsWrapper.wrap(reader::start);
+            final boolean hasData = invoke(reader::start);
             if (hasData) {
               available.acquire();
               enqueueMessage(reader);
@@ -296,7 +300,7 @@ public class UnboundedSourceSystem {
           while (running) {
             boolean elementAvailable = false;
             for (UnboundedReader reader : readers) {
-              final boolean hasData = metricsWrapper.wrap(reader::advance);
+              final boolean hasData = invoke(reader::advance);
               if (hasData) {
                 while (!available.tryAcquire(
                     1,
@@ -342,6 +346,14 @@ public class UnboundedSourceSystem {
         }
       }
 
+      private <X> X invoke(FnWithMetricsWrapper.SupplierWithException<X> fn) throws Exception {
+        if (metricsWrapper != null) {
+          return metricsWrapper.wrap(fn);
+        } else {
+          return fn.get();
+        }
+      }
+
       private void updateWatermark() throws InterruptedException {
         final long time = System.currentTimeMillis();
         if (time - lastWatermarkTime > watermarkInterval) {
@@ -420,7 +432,7 @@ public class UnboundedSourceSystem {
           final ByteArrayOutputStream baos = new ByteArrayOutputStream();
           @SuppressWarnings("unchecked")
           final CheckpointMarkT checkpointMark =
-              (CheckpointMarkT) metricsWrapper.wrap(reader::getCheckpointMark);
+              (CheckpointMarkT) invoke(reader::getCheckpointMark);
           checkpointMarkCoder.encode(checkpointMark, baos);
           return Base64.getEncoder().encodeToString(baos.toByteArray());
         } catch (Exception e) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 096ae7b..e50fe30 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -106,8 +106,10 @@ public class SamzaDoFnRunners {
             doFnSchemaInformation);
 
     final DoFnRunner<InT, FnOutT> doFnRunnerWithMetrics =
-        DoFnRunnerWithMetrics.wrap(
-            underlyingRunner, executionContext.getMetricsContainer(), stepName);
+        pipelineOptions.getEnableMetrics()
+            ? DoFnRunnerWithMetrics.wrap(
+                underlyingRunner, executionContext.getMetricsContainer(), stepName)
+            : underlyingRunner;
 
     if (keyedInternals != null) {
       final DoFnRunner<InT, FnOutT> statefulDoFnRunner =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 94e319c..0c545fe 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -833,7 +833,10 @@ public class SamzaStoreStateInternals<K> implements StateInternals {
       final Instant currentValue = readInternal();
       final Instant combinedValue =
           currentValue == null ? value : timestampCombiner.combine(currentValue, value);
-      writeInternal(combinedValue);
+
+      if (!combinedValue.equals(currentValue)) {
+        writeInternal(combinedValue);
+      }
     }
 
     @Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index 478357e..a3578ed 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -66,12 +66,14 @@ public class ConfigBuilder {
 
   public Config build() {
     try {
-      config.putAll(systemStoreConfig());
+      config.putAll(systemStoreConfig(options));
 
       // apply user configs
       config.putAll(createUserConfig(options));
 
       config.put(JobConfig.JOB_NAME(), options.getJobName());
+      config.put(JobConfig.JOB_ID(), options.getJobInstance());
+
       config.put(
           "beamPipelineOptions",
           Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
@@ -151,14 +153,26 @@ public class ConfigBuilder {
         .build();
   }
 
-  private static Map<String, String> systemStoreConfig() {
-    return ImmutableMap.<String, String>builder()
-        .put(
-            "stores.beamStore.factory",
-            "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
-        .put("stores.beamStore.key.serde", "byteSerde")
-        .put("stores.beamStore.msg.serde", "byteSerde")
-        .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName())
-        .build();
+  private static Map<String, String> systemStoreConfig(SamzaPipelineOptions options) {
+    ImmutableMap.Builder<String, String> configBuilder =
+        ImmutableMap.<String, String>builder()
+            .put(
+                "stores.beamStore.factory",
+                "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory")
+            .put("stores.beamStore.key.serde", "byteSerde")
+            .put("stores.beamStore.msg.serde", "byteSerde")
+            .put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName());
+
+    if (options.getStateDurable()) {
+      configBuilder.put("stores.beamStore.changelog", getChangelogTopic(options, "beamStore"));
+      configBuilder.put("job.host-affinity.enabled", "true");
+    }
+
+    return configBuilder.build();
+  }
+
+  static String getChangelogTopic(SamzaPipelineOptions options, String storeName) {
+    return String.format(
+        "%s-%s-%s-changelog", options.getJobName(), options.getJobInstance(), storeName);
   }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index 6e79546..be38fec 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -32,6 +32,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.runtime.DoFnOp;
 import org.apache.beam.runners.samza.runtime.Op;
 import org.apache.beam.runners.samza.runtime.OpAdapter;
@@ -272,6 +273,8 @@ class ParDoBoundMultiTranslator<InT, OutT>
       ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, ConfigContext ctx) {
     final Map<String, String> config = new HashMap<>();
     final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    final SamzaPipelineOptions options = ctx.getPipelineOptions();
+
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
@@ -281,6 +284,12 @@ class ParDoBoundMultiTranslator<InT, OutT>
             "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
         config.put("stores." + storeId + ".key.serde", "byteSerde");
         config.put("stores." + storeId + ".msg.serde", "byteSerde");
+
+        if (options.getStateDurable()) {
+          config.put(
+              "stores." + storeId + ".changelog",
+              ConfigBuilder.getChangelogTopic(options, storeId));
+        }
       }
     }
 
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
new file mode 100644
index 0000000..881fc76
--- /dev/null
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Map;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.junit.Test;
+
+/** Test config generations for {@link org.apache.beam.runners.samza.SamzaRunner}. */
+public class ConfigGeneratorTest {
+
+  @Test
+  public void testBeamStoreConfig() {
+    SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    options.setJobName("TestStoreConfig");
+    options.setRunner(SamzaRunner.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1, 2, 3)).apply(Sum.integersGlobally());
+
+    pipeline.replaceAll(SamzaTransformOverrides.getDefaultOverrides());
+
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final ConfigBuilder configBuilder = new ConfigBuilder(options);
+    SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+    final Config config = configBuilder.build();
+
+    assertEquals(
+        RocksDbKeyValueStorageEngineFactory.class.getName(),
+        config.get("stores.beamStore.factory"));
+    assertEquals("byteSerde", config.get("stores.beamStore.key.serde"));
+    assertEquals("byteSerde", config.get("stores.beamStore.msg.serde"));
+    assertNull(config.get("stores.beamStore.changelog"));
+
+    options.setStateDurable(true);
+    SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+    final Config config2 = configBuilder.build();
+    assertEquals(
+        "TestStoreConfig-1-beamStore-changelog", config2.get("stores.beamStore.changelog"));
+  }
+
+  @Test
+  public void testUserStoreConfig() {
+    SamzaPipelineOptions options = PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
+    options.setJobName("TestStoreConfig");
+    options.setRunner(SamzaRunner.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline
+        .apply(
+            Create.empty(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())))
+        .apply(
+            ParDo.of(
+                new DoFn<KV<String, String>, Void>() {
+                  private static final String testState = "testState";
+
+                  @StateId(testState)
+                  private final StateSpec<ValueState<Integer>> state = StateSpecs.value();
+
+                  @ProcessElement
+                  public void processElement(
+                      ProcessContext context, @StateId(testState) ValueState<Integer> state) {}
+                }));
+
+    final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final ConfigBuilder configBuilder = new ConfigBuilder(options);
+    SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+    final Config config = configBuilder.build();
+
+    assertEquals(
+        RocksDbKeyValueStorageEngineFactory.class.getName(),
+        config.get("stores.testState.factory"));
+    assertEquals("byteSerde", config.get("stores.testState.key.serde"));
+    assertEquals("byteSerde", config.get("stores.testState.msg.serde"));
+    assertNull(config.get("stores.testState.changelog"));
+
+    options.setStateDurable(true);
+    SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
+    final Config config2 = configBuilder.build();
+    assertEquals(
+        "TestStoreConfig-1-testState-changelog", config2.get("stores.testState.changelog"));
+  }
+}