You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/09/20 18:19:14 UTC

samza git commit: SAMZA-1854: Changed caching table descriptor to take table descriptor instead of run-time objects

Repository: samza
Updated Branches:
  refs/heads/master d89391231 -> db6996ed9


SAMZA-1854: Changed caching table descriptor to take table descriptor instead of run-time objects

As per subject, changed caching table descriptor to take table descriptor instead of run-time objects
 - Added BaseHybridTableDescriptor, which models a hybrid table that may contain other tables
 - Modified StreamApplicationDescriptorImpl to also include tables contained within a hybrid table

Author: Wei Song <ws...@linkedin.com>

Reviewers: Jagadish Venkatraman <jv...@linkedin.com>

Closes #645 from weisong44/SAMZA-1854 and squashes the following commits:

2c0d1362 [Wei Song] Updated based on review comments
dd18bbee [Wei Song] Merge branch 'master' into SAMZA-1854
a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master'
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
a87a9b04 [Wei Song] SAMZA-1854: Changed caching table descriptor to take table descriptor instead of run-time objects
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db6996ed
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db6996ed
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db6996ed

Branch: refs/heads/master
Commit: db6996ed99bb6a677f588f247b373345077580b0
Parents: d893912
Author: Wei Song <ws...@linkedin.com>
Authored: Thu Sep 20 11:19:06 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Thu Sep 20 11:19:06 2018 -0700

----------------------------------------------------------------------
 .../StreamApplicationDescriptorImpl.java        |  8 ++++
 .../table/caching/CachingTableDescriptor.java   | 37 +++++++++------
 .../table/hybrid/BaseHybridTableDescriptor.java | 50 ++++++++++++++++++++
 .../samza/table/caching/TestCachingTable.java   | 22 ++++++---
 .../kv/LocalStoreBackedReadWriteTable.java      |  1 +
 .../samza/test/table/TestRemoteTable.java       | 33 ++++++-------
 6 files changed, 112 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
index ae7a45d..d50b0d0 100644
--- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -54,6 +55,7 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.hybrid.BaseHybridTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,6 +173,12 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
 
   @Override
   public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
+
+    if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+      List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+      tableDescs.forEach(td -> getTable(td));
+    }
+
     String tableId = tableDescriptor.getTableId();
     Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
         String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));

http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
index a1accd8..4896e93 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableDescriptor.java
@@ -20,28 +20,30 @@
 package org.apache.samza.table.caching;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.samza.operators.BaseTableDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TableImpl;
-import org.apache.samza.table.Table;
+import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.hybrid.BaseHybridTableDescriptor;
 
 import com.google.common.base.Preconditions;
 
+
 /**
  * Table descriptor for {@link CachingTable}.
  * @param <K> type of the key in the cache
  * @param <V> type of the value in the cache
  */
-public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
+public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
   private Duration readTtl;
   private Duration writeTtl;
   private long cacheSize;
-  private Table<KV<K, V>> cache;
-  private Table<KV<K, V>> table;
+  private TableDescriptor<K, V, ?> cache;
+  private TableDescriptor<K, V, ?> table;
   private boolean isWriteAround;
 
   /**
@@ -52,6 +54,13 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach
   }
 
   @Override
+  public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
+    return cache != null
+        ? Arrays.asList(cache, table)
+        : Arrays.asList(table);
+  }
+
+  @Override
   public TableSpec getTableSpec() {
     validate();
 
@@ -59,7 +68,7 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach
     generateTableSpecConfig(tableSpecConfig);
 
     if (cache != null) {
-      tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((TableImpl) cache).getTableSpec().getId());
+      tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
     } else {
       if (readTtl != null) {
         tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis()));
@@ -72,31 +81,31 @@ public class CachingTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Cach
       }
     }
 
-    tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((TableImpl) table).getTableSpec().getId());
+    tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
     tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround));
 
     return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
   }
 
   /**
-   * Specify a cache instance (as Table abstraction) to be used for caching.
+   * Specify a cache (as Table descriptor) to be used for caching.
    * Cache get is not synchronized with put for better parallelism in the read path
    * of {@link CachingTable}. As such, cache table implementation is expected to be
    * thread-safe for concurrent accesses.
-   * @param cache cache instance
+   * @param cache cache table descriptor
    * @return this descriptor
    */
-  public CachingTableDescriptor withCache(Table<KV<K, V>> cache) {
+  public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) {
     this.cache = cache;
     return this;
   }
 
   /**
-   * Specify the table instance for the actual table input/output.
-   * @param table table instance
+   * Specify the target table descriptor for the actual table input/output.
+   * @param table the target table descriptor
    * @return this descriptor
    */
-  public CachingTableDescriptor withTable(Table<KV<K, V>> table) {
+  public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) {
     this.table = table;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java
new file mode 100644
index 0000000..48efd0c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/hybrid/BaseHybridTableDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.hybrid;
+
+import java.util.List;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.TableDescriptor;
+
+/**
+ * Base class for hybrid table descriptors. A hybrid table consists of one or more
+ * table descriptors, and it orchestrates operations between them to achieve more advanced
+ * functionality.
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ * @param <D> the type of this table descriptor
+ */
+abstract public class BaseHybridTableDescriptor<K, V, D extends BaseHybridTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  /**
+   * {@inheritDoc}
+   */
+  public BaseHybridTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Get tables contained within this table.
+   * @return list of tables
+   */
+  abstract public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 49c72dc..ec1c915 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -36,11 +36,11 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.TableImpl;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.caching.guava.GuavaCacheTableDescriptor;
@@ -79,12 +79,12 @@ public class TestCachingTable {
     guavaTableDesc.withCache(CacheBuilder.newBuilder().build());
     TableSpec spec = guavaTableDesc.getTableSpec();
     Assert.assertTrue(spec.getConfig().containsKey(GuavaCacheTableProvider.GUAVA_CACHE));
-    doTestSerialize(new TableImpl(guavaTableDesc.getTableSpec()));
+    doTestSerialize(guavaTableDesc);
   }
 
-  private void doTestSerialize(Table cache) {
+  private void doTestSerialize(TableDescriptor cache) {
     CachingTableDescriptor desc = new CachingTableDescriptor("1");
-    desc.withTable(new TableImpl(new TableSpec("2", null, null, new HashMap<>())));
+    desc.withTable(createDummyTableDescriptor("2"));
     if (cache == null) {
       desc.withReadTtl(Duration.ofMinutes(3));
       desc.withWriteTtl(Duration.ofMinutes(3));
@@ -153,8 +153,8 @@ public class TestCachingTable {
 
   private void doTestCacheOps(boolean isWriteAround) {
     CachingTableDescriptor desc = new CachingTableDescriptor("1");
-    desc.withTable(new TableImpl(new TableSpec("realTable", null, null, new HashMap<>())));
-    desc.withCache(new TableImpl(new TableSpec("cacheTable", null, null, new HashMap<>())));
+    desc.withTable(createDummyTableDescriptor("realTable"));
+    desc.withCache(createDummyTableDescriptor("cacheTable"));
     if (isWriteAround) {
       desc.withWriteAround();
     }
@@ -363,4 +363,12 @@ public class TestCachingTable {
     Assert.assertNull(guavaCache.getIfPresent("foo1"));
     Assert.assertNull(guavaCache.getIfPresent("foo3"));
   }
+
+  private TableDescriptor createDummyTableDescriptor(String tableId) {
+    BaseTableDescriptor tableDescriptor = mock(BaseTableDescriptor.class);
+    when(tableDescriptor.getTableId()).thenReturn(tableId);
+    when(tableDescriptor.getTableSpec()).thenReturn(
+        new TableSpec(tableId, null, null, new HashMap<>()));
+    return tableDescriptor;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
index 98c3e3c..9eeb55e 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
@@ -40,6 +40,7 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
 
   /**
    * Constructs an instance of {@link LocalStoreBackedReadWriteTable}
+   * @param tableId the table Id
    * @param kvStore the backing store
    */
   public LocalStoreBackedReadWriteTable(String tableId, KeyValueStore kvStore) {

http://git-wip-us.apache.org/repos/asf/samza/blob/db6996ed/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index 4cf99ff..e23cb58 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -42,6 +42,7 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
+import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.runtime.LocalApplicationRunner;
@@ -67,6 +68,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
 
 
 public class TestRemoteTable extends AbstractIntegrationTestHarness {
@@ -136,19 +138,18 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     }
   }
 
-  private <K, V> Table<KV<K, V>> getCachingTable(Table<KV<K, V>> actualTable, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
+  private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
     CachingTableDescriptor<K, V> cachingDesc = new CachingTableDescriptor<>("caching-table-" + id);
     if (defaultCache) {
       cachingDesc.withReadTtl(Duration.ofMinutes(5));
       cachingDesc.withWriteTtl(Duration.ofMinutes(5));
     } else {
-      GuavaCacheTableDescriptor<K, V> guavaDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
-      guavaDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
-      Table<KV<K, V>> guavaTable = appDesc.getTable(guavaDesc);
-      cachingDesc.withCache(guavaTable);
+      GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
+      guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
+      cachingDesc.withCache(guavaTableDesc);
     }
 
-    cachingDesc.withTable(actualTable);
+    cachingDesc.withTable(actualTableDesc);
     return appDesc.getTable(cachingDesc);
   }
 
@@ -180,8 +181,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
     configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
 
-    final RateLimiter readRateLimiter = mock(RateLimiter.class);
-    final RateLimiter writeRateLimiter = mock(RateLimiter.class);
+    final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+    final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
     final StreamApplication app = appDesc -> {
       RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
       inputTableDesc
@@ -197,17 +198,13 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
           .withWriteFunction(writer)
           .withRateLimiter(writeRateLimiter, null, null);
 
-      Table<KV<Integer, EnrichedPageView>> outputTable = appDesc.getTable(outputTableDesc);
+      Table<KV<Integer, EnrichedPageView>> outputTable = withCache
+          ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
+          : appDesc.getTable(outputTableDesc);
 
-      if (withCache) {
-        outputTable = getCachingTable(outputTable, defaultCache, "output", appDesc);
-      }
-
-      Table<KV<Integer, Profile>> inputTable = appDesc.getTable(inputTableDesc);
-
-      if (withCache) {
-        inputTable = getCachingTable(inputTable, defaultCache, "input", appDesc);
-      }
+      Table<KV<Integer, Profile>> inputTable = withCache
+          ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
+          : appDesc.getTable(inputTableDesc);
 
       DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
       GenericInputDescriptor<TestTableData.PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());