You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2019/11/18 21:50:38 UTC

[samza] branch master updated: SAMZA-2386: Get store names should return correct store names in the presence of side inputs (#1220)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 33fc0b8  SAMZA-2386: Get store names should return correct store names in the presence of side inputs (#1220)
33fc0b8 is described below

commit 33fc0b8b1307f39486319bc80f5dc55b7c8570a8
Author: mynameborat <bh...@gmail.com>
AuthorDate: Mon Nov 18 13:50:30 2019 -0800

    SAMZA-2386: Get store names should return correct store names in the presence of side inputs (#1220)
---
 .../main/java/org/apache/samza/config/StorageConfig.java   |  7 +++++--
 .../java/org/apache/samza/config/TestStorageConfig.java    | 14 +++++++++++++-
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 4910a7f..b5687c8 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -40,6 +40,7 @@ import static com.google.common.base.Preconditions.*;
 public class StorageConfig extends MapConfig {
   private static final String FACTORY_SUFFIX = ".factory";
   private static final String CHANGELOG_SUFFIX = ".changelog";
+  private static final String SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX = ".side.inputs.processor.factory";
   private static final String STORE_PREFIX = "stores.";
 
   public static final String FACTORY = STORE_PREFIX + "%s" + FACTORY_SUFFIX;
@@ -67,7 +68,7 @@ public class StorageConfig extends MapConfig {
   static final String ACCESSLOG_ENABLED = STORE_PREFIX + "%s.accesslog.enabled";
   static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50;
   static final String SIDE_INPUTS = STORE_PREFIX + "%s.side.inputs";
-  static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s.side.inputs.processor.factory";
+  static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s" + SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX;
   static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE =
       STORE_PREFIX + "%s.side.inputs.processor.serialized.instance";
   static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
@@ -84,7 +85,9 @@ public class StorageConfig extends MapConfig {
     Config subConfig = subset(STORE_PREFIX, true);
     List<String> storeNames = new ArrayList<>();
     for (String key : subConfig.keySet()) {
-      if (key.endsWith(FACTORY_SUFFIX)) {
+      if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
+        storeNames.add(key.substring(0, key.length() - SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
+      } else if (key.endsWith(FACTORY_SUFFIX)) {
         storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
       }
     }
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 713aa49..baecf99 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaException;
 import org.junit.Test;
@@ -44,6 +45,7 @@ public class TestStorageConfig {
     // empty config, so no stores
     assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getStoreNames());
 
+    Set<String> expectedStoreNames = ImmutableSet.of(STORE_NAME0, STORE_NAME1);
     // has stores
     StorageConfig storageConfig = new StorageConfig(new MapConfig(
         ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class",
@@ -51,7 +53,17 @@ public class TestStorageConfig {
     List<String> actual = storageConfig.getStoreNames();
     // ordering shouldn't matter
     assertEquals(2, actual.size());
-    assertEquals(ImmutableSet.of(STORE_NAME0, STORE_NAME1), ImmutableSet.copyOf(actual));
+    assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+
+    //has side input stores
+    StorageConfig config = new StorageConfig(new MapConfig(
+        ImmutableMap.of(String.format(FACTORY, STORE_NAME0), "store0.factory.class",
+            String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME1), "store1.factory.class")));
+
+    actual = storageConfig.getStoreNames();
+
+    assertEquals(2, actual.size());
+    assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
   }
 
   @Test