You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/12/17 23:11:34 UTC
[1/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and
ReadWriteTable
Repository: samza
Updated Branches:
refs/heads/master c5348bf6b -> 6a75503d7
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
new file mode 100644
index 0000000..e38af39
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import com.google.common.cache.CacheBuilder;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
+import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
+import org.apache.samza.table.remote.RemoteTable;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.generatePageViews;
+import static org.apache.samza.test.table.TestTableData.generateProfiles;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+
+
+public class TestRemoteTableEndToEnd extends AbstractIntegrationTestHarness {
+
+ static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
+
+ static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> {
+ private final String serializedProfiles;
+ private transient Map<Integer, Profile> profileMap;
+
+ private InMemoryReadFunction(String profiles) {
+ this.serializedProfiles = profiles;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
+ this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
+ }
+
+ @Override
+ public CompletableFuture<Profile> getAsync(Integer key) {
+ return CompletableFuture.completedFuture(profileMap.get(key));
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+
+ static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
+ return new InMemoryReadFunction(serializedProfiles);
+ }
+ }
+
+ static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> {
+ private transient List<EnrichedPageView> records;
+ private String testName;
+
+ public InMemoryWriteFunction(String testName) {
+ this.testName = testName;
+ }
+
+ // Verify serializable functionality
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // Write to the global list for verification
+ records = writtenRecords.get(testName);
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
+ records.add(record);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(Integer key) {
+ records.remove(key);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
+ CachingTableDescriptor<K, V> cachingDesc;
+ if (defaultCache) {
+ cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
+ cachingDesc.withReadTtl(Duration.ofMinutes(5));
+ cachingDesc.withWriteTtl(Duration.ofMinutes(5));
+ } else {
+ GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
+ guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
+ cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
+ }
+
+ return appDesc.getTable(cachingDesc);
+ }
+
+ static class MyReadFunction implements TableReadFunction {
+ @Override
+ public CompletableFuture getAsync(Object key) {
+ return null;
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
+ final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
+
+ writtenRecords.put(testName, new ArrayList<>());
+
+ int count = 10;
+ PageView[] pageViews = generatePageViews(count);
+ String profiles = Base64Serializer.serialize(generateProfiles(count));
+
+ int partitionCount = 4;
+ Map<String, String> configs = TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+ final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+ final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+ final StreamApplication app = appDesc -> {
+ RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
+ inputTableDesc
+ .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
+ .withRateLimiter(readRateLimiter, null, null);
+
+ // dummy reader
+ TableReadFunction readFn = new MyReadFunction();
+
+ RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+ outputTableDesc
+ .withReadFunction(readFn)
+ .withWriteFunction(writer)
+ .withRateLimiter(writeRateLimiter, null, null);
+
+ Table<KV<Integer, EnrichedPageView>> outputTable = withCache
+ ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
+ : appDesc.getTable(outputTableDesc);
+
+ Table<KV<Integer, Profile>> inputTable = withCache
+ ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
+ : appDesc.getTable(inputTableDesc);
+
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDesc.getInputStream(isd)
+ .map(pv -> new KV<>(pv.getMemberId(), pv))
+ .join(inputTable, new PageViewToProfileJoinFunction())
+ .map(m -> new KV(m.getMemberId(), m))
+ .sendTo(outputTable);
+ };
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ int numExpected = count * partitionCount;
+ Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
+ Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTable() throws Exception {
+ doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable");
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTableWithCache() throws Exception {
+ doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache");
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception {
+ doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache");
+ }
+
+ private Context createMockContext() {
+ MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
+ doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
+ Context context = new MockContext();
+ doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
+ doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+ return context;
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testCatchReaderException() {
+ TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
+ CompletableFuture<String> future = new CompletableFuture<>();
+ future.completeExceptionally(new RuntimeException("Expected test exception"));
+ doReturn(future).when(reader).getAsync(anyString());
+ TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+ RemoteTable<String, String> table = new RemoteTable<>("table1", reader, null,
+ rateLimitHelper, null, Executors.newSingleThreadExecutor(), null);
+ table.init(createMockContext());
+ table.get("abc");
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testCatchWriterException() {
+ TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+ TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
+ CompletableFuture<String> future = new CompletableFuture<>();
+ future.completeExceptionally(new RuntimeException("Expected test exception"));
+ doReturn(future).when(writer).putAsync(anyString(), any());
+ TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+ RemoteTable<String, String> table = new RemoteTable<String, String>(
+ "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
+ table.init(createMockContext());
+ table.put("abc", "efg");
+ }
+
+ @Test
+ public void testUninitializedWriter() {
+ TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+ TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+ RemoteTable<String, String> table = new RemoteTable<String, String>(
+ "table1", reader, null, rateLimitHelper, null, Executors.newSingleThreadExecutor(), null);
+ table.init(createMockContext());
+ int failureCount = 0;
+ try {
+ table.put("abc", "efg");
+ } catch (SamzaException ex) {
+ ++failureCount;
+ }
+ try {
+ table.delete("abc");
+ } catch (SamzaException ex) {
+ ++failureCount;
+ }
+ table.flush();
+ table.close();
+ Assert.assertEquals(2, failureCount);
+ }
+}
[4/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and
ReadWriteTable
Posted by we...@apache.org.
SAMZA-2043: Consolidate ReadableTable and ReadWriteTable
So far we've not seen a lot of use in maintaining separate implementation for ReadableTable and ReadWriteTable, which adds quite a bit complexity. Hence consolidating them.
Author: Wei Song <ws...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #861 from weisong44/SAMZA-2043
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6a75503d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6a75503d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6a75503d
Branch: refs/heads/master
Commit: 6a75503d74ae65b30e2dcf760bba6e1d8050cdba
Parents: c5348bf
Author: Wei Song <ws...@linkedin.com>
Authored: Mon Dec 17 15:11:27 2018 -0800
Committer: Wei Song <ws...@linkedin.com>
Committed: Mon Dec 17 15:11:27 2018 -0800
----------------------------------------------------------------------
.../org/apache/samza/context/TaskContext.java | 16 +-
.../org/apache/samza/table/ReadWriteTable.java | 53 ++-
.../org/apache/samza/table/ReadableTable.java | 86 ----
.../main/java/org/apache/samza/table/Table.java | 2 -
.../org/apache/samza/table/TableProvider.java | 4 +-
.../table/descriptors/BaseTableDescriptor.java | 1 +
.../table/descriptors/LocalTableDescriptor.java | 1 +
.../descriptors/RemoteTableDescriptor.java | 1 +
.../apache/samza/table/utils/SerdeUtils.java | 1 +
.../table/remote/TestTableRateLimiter.java | 2 +-
.../apache/samza/context/TaskContextImpl.java | 4 +-
.../operators/impl/SendToTableOperatorImpl.java | 2 +-
.../impl/StreamTableJoinOperatorImpl.java | 6 +-
.../apache/samza/table/BaseReadWriteTable.java | 69 +++
.../apache/samza/table/BaseReadableTable.java | 75 ---
.../org/apache/samza/table/TableManager.java | 4 +-
.../samza/table/caching/CachingTable.java | 108 ++---
.../table/caching/CachingTableProvider.java | 8 +-
.../table/caching/guava/GuavaCacheTable.java | 4 +-
.../caching/guava/GuavaCacheTableProvider.java | 4 +-
.../table/remote/RemoteReadWriteTable.java | 244 ----------
.../samza/table/remote/RemoteReadableTable.java | 246 ----------
.../apache/samza/table/remote/RemoteTable.java | 436 ++++++++++++++++++
.../samza/table/remote/RemoteTableProvider.java | 68 ++-
.../apache/samza/table/utils/TableMetrics.java | 77 ++++
.../samza/table/utils/TableReadMetrics.java | 54 ---
.../samza/table/utils/TableWriteMetrics.java | 60 ---
.../impl/TestStreamTableJoinOperatorImpl.java | 4 +-
.../apache/samza/table/TestTableManager.java | 4 +-
.../samza/table/caching/TestCachingTable.java | 15 +-
.../descriptors/TestLocalTableDescriptor.java | 4 +-
.../table/remote/TestRemoteReadWriteTable.java | 458 -------------------
.../samza/table/remote/TestRemoteTable.java | 456 ++++++++++++++++++
.../descriptors/TestRemoteTableDescriptor.java | 8 +-
.../retry/TestRetriableTableFunctions.java | 4 +-
.../inmemory/TestInMemoryTableDescriptor.java | 4 +-
.../descriptors/TestRocksDbTableDescriptor.java | 4 +-
.../samza/storage/kv/LocalReadWriteTable.java | 154 -------
.../samza/storage/kv/LocalReadableTable.java | 108 -----
.../org/apache/samza/storage/kv/LocalTable.java | 213 +++++++++
.../samza/storage/kv/LocalTableProvider.java | 7 +-
.../storage/kv/TestLocalReadWriteTable.java | 247 ----------
.../storage/kv/TestLocalReadableTable.java | 155 -------
.../storage/kv/TestLocalTableProvider.java | 2 +-
.../samza/storage/kv/TestLocalTableRead.java | 155 +++++++
.../samza/storage/kv/TestLocalTableWrite.java | 247 ++++++++++
.../framework/StreamTaskIntegrationTest.java | 2 +-
.../apache/samza/test/table/TestLocalTable.java | 362 ---------------
.../test/table/TestLocalTableEndToEnd.java | 361 +++++++++++++++
.../samza/test/table/TestRemoteTable.java | 288 ------------
.../test/table/TestRemoteTableEndToEnd.java | 310 +++++++++++++
51 files changed, 2509 insertions(+), 2699 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
index cdf7404..8adfcea 100644
--- a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
@@ -25,8 +25,6 @@ import org.apache.samza.scheduler.CallbackScheduler;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
/**
@@ -63,17 +61,15 @@ public interface TaskContext {
KeyValueStore<?, ?> getStore(String storeName);
/**
- * Gets the {@link Table} corresponding to the {@code tableId} for this task.
+ * Gets the {@link ReadWriteTable} corresponding to the {@code tableId} for this task.
*
- * The returned table should be cast with the concrete type parameters based on the configured table serdes, and
- * whether it is {@link ReadWriteTable} or {@link ReadableTable}. E.g., if using string key and integer value
- * serde for a writable table, it should be cast to a {@code ReadWriteTable<String, Integer>}.
- *
- * @param tableId id of the {@link Table} to get
- * @return the {@link Table} associated with {@code tableId} for this task
+ * @param tableId id of the {@link ReadWriteTable} to get
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @return the {@link ReadWriteTable} associated with {@code tableId} for this task
* @throws IllegalArgumentException if there is no table associated with {@code tableId}
*/
- Table<?> getTable(String tableId);
+ <K, V> ReadWriteTable<K, V> getTable(String tableId);
/**
* Gets the {@link CallbackScheduler} for this task, which can be used to schedule a callback to be executed
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
index 083a1b5..ffb87a4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
+++ b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
@@ -19,9 +19,11 @@
package org.apache.samza.table;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
/**
@@ -32,7 +34,51 @@ import org.apache.samza.storage.kv.Entry;
* @param <V> the type of the value in this table
*/
@InterfaceStability.Unstable
-public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
+public interface ReadWriteTable<K, V> extends Table {
+
+ /**
+ * Initializes the table during container initialization.
+ * Guaranteed to be invoked as the first operation on the table.
+ * @param context {@link Context} corresponding to this table
+ */
+ default void init(Context context) {
+ }
+
+ /**
+ * Gets the value associated with the specified {@code key}.
+ *
+ * @param key the key with which the associated value is to be fetched.
+ * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+ * @throws NullPointerException if the specified {@code key} is {@code null}.
+ */
+ V get(K key);
+
+ /**
+ * Asynchronously gets the value associated with the specified {@code key}.
+ *
+ * @param key the key with which the associated value is to be fetched.
+ * @return completableFuture for the requested value
+ * @throws NullPointerException if the specified {@code key} is {@code null}.
+ */
+ CompletableFuture<V> getAsync(K key);
+
+ /**
+ * Gets the values with which the specified {@code keys} are associated.
+ *
+ * @param keys the keys with which the associated values are to be fetched.
+ * @return a map of the keys that were found and their respective values.
+ * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+ */
+ Map<K, V> getAll(List<K> keys);
+
+ /**
+ * Asynchronously gets the values with which the specified {@code keys} are associated.
+ *
+ * @param keys the keys with which the associated values are to be fetched.
+ * @return completableFuture for the requested entries
+ * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+ */
+ CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
/**
* Updates the mapping of the specified key-value pair;
@@ -114,4 +160,9 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
* Flushes the underlying store of this table, if applicable.
*/
void flush();
+
+ /**
+ * Close the table and release any resources acquired
+ */
+ void close();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
deleted file mode 100644
index 6c88fd3..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.KV;
-
-
-/**
- *
- * A table that supports get by one or more keys
- *
- * @param <K> the type of the record key in this table
- * @param <V> the type of the record value in this table
- */
-@InterfaceStability.Unstable
-public interface ReadableTable<K, V> extends Table<KV<K, V>> {
- /**
- * Initializes the table during container initialization.
- * Guaranteed to be invoked as the first operation on the table.
- * @param context {@link Context} corresponding to this table
- */
- default void init(Context context) {
- }
-
- /**
- * Gets the value associated with the specified {@code key}.
- *
- * @param key the key with which the associated value is to be fetched.
- * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
- * @throws NullPointerException if the specified {@code key} is {@code null}.
- */
- V get(K key);
-
- /**
- * Asynchronously gets the value associated with the specified {@code key}.
- *
- * @param key the key with which the associated value is to be fetched.
- * @return completableFuture for the requested value
- * @throws NullPointerException if the specified {@code key} is {@code null}.
- */
- CompletableFuture<V> getAsync(K key);
-
- /**
- * Gets the values with which the specified {@code keys} are associated.
- *
- * @param keys the keys with which the associated values are to be fetched.
- * @return a map of the keys that were found and their respective values.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- Map<K, V> getAll(List<K> keys);
-
- /**
- * Asynchronously gets the values with which the specified {@code keys} are associated.
- *
- * @param keys the keys with which the associated values are to be fetched.
- * @return completableFuture for the requested entries
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
-
- /**
- * Close the table and release any resources acquired
- */
- void close();
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/Table.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java b/samza-api/src/main/java/org/apache/samza/table/Table.java
index 234d15b..c454012 100644
--- a/samza-api/src/main/java/org/apache/samza/table/Table.java
+++ b/samza-api/src/main/java/org/apache/samza/table/Table.java
@@ -36,8 +36,6 @@ import org.apache.samza.task.InitableTask;
* hybrid tables. For remote data sources, a {@code RemoteTable} provides optimized access with caching, rate-limiting,
* and retry support.
* <p>
- * Depending on the implementation, a {@link Table} can be a {@link ReadableTable} or a {@link ReadWriteTable}.
- * <p>
* Use a {@link TableDescriptor} to specify the properties of a {@link Table}. For High Level API
* {@link StreamApplication}s, use {@link StreamApplicationDescriptor#getTable} to obtain the {@link Table} instance for
* the descriptor that can be used with the {@link MessageStream} operators like {@link MessageStream#sendTo(Table)}.
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index 2dec989..36cad2e 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -34,10 +34,10 @@ public interface TableProvider {
void init(Context context);
/**
- * Get an instance of the table for read/write operations
+ * Get an instance of the {@link ReadWriteTable}
* @return the underlying table
*/
- Table getTable();
+ ReadWriteTable getTable();
/**
* Shutdown the underlying table
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
index 26c2ae3..52eca5f 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
@@ -56,6 +56,7 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
* @param value the value
* @return this table descriptor instance
*/
+ @SuppressWarnings("unchecked")
public D withConfig(String key, String value) {
config.put(key, value);
return (D) this;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index 1ebb580..1623710 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -39,6 +39,7 @@ import org.apache.samza.table.utils.SerdeUtils;
* @param <V> the type of the value in this table
* @param <D> the type of the concrete table descriptor
*/
+@SuppressWarnings("unchecked")
abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
extends BaseTableDescriptor<K, V, D> {
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 7286004..4b15c47 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -235,6 +235,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
if (!tagCreditsMap.isEmpty()) {
RateLimiter defaultRateLimiter;
try {
+ @SuppressWarnings("unchecked")
Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
defaultRateLimiter = ctor.newInstance(tagCreditsMap);
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
index a7b66e5..338baf4 100644
--- a/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
+++ b/samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
@@ -54,6 +54,7 @@ public final class SerdeUtils {
* @return deserialized object instance
* @param <T> type of the object
*/
+ @SuppressWarnings("unchecked")
public static <T> T deserialize(String name, String strObject) {
try {
byte [] bytes = Base64.getDecoder().decode(strObject);
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
index ea9acbd..3235d5a 100644
--- a/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
+++ b/samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
@@ -25,9 +25,9 @@ import java.util.Collections;
import org.apache.samza.metrics.Timer;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
import org.junit.Test;
-import junit.framework.Assert;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyMap;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index ec52f8a..a29c2b3 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.scheduler.CallbackScheduler;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableManager;
import java.util.HashMap;
@@ -83,7 +83,7 @@ public class TaskContextImpl implements TaskContext {
}
@Override
- public Table getTable(String tableId) {
+ public <K, V> ReadWriteTable<K, V> getTable(String tableId) {
return this.tableManager.getTable(tableId);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
index 0d39c1b..6d84b17 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
@@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
this.sendToTableOpSpec = sendToTableOpSpec;
- this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
+ this.table = context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
index 96f07d1..e3fc266 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
@@ -22,7 +22,7 @@ import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
@@ -42,11 +42,11 @@ import java.util.Collections;
class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> {
private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
- private final ReadableTable<K, ?> table;
+ private final ReadWriteTable<K, ?> table;
StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
this.joinOpSpec = joinOpSpec;
- this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableId());
+ this.table = context.getTaskContext().getTable(joinOpSpec.getTableId());
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
new file mode 100644
index 0000000..cef224d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.table.utils.TableMetrics;
+import org.apache.samza.util.HighResolutionClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for a concrete table implementation
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+abstract public class BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> {
+
+ protected final Logger logger;
+
+ protected final String tableId;
+
+ protected TableMetrics metrics;
+
+ protected HighResolutionClock clock;
+
+ /**
+ * Construct an instance
+ * @param tableId Id of the table
+ */
+ public BaseReadWriteTable(String tableId) {
+ Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
+ String.format("Invalid table Id: %s", tableId));
+ this.tableId = tableId;
+ this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId);
+ }
+
+ @Override
+ public void init(Context context) {
+ MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+ clock = metricsConfig.getMetricsTimerEnabled()
+ ? () -> System.nanoTime()
+ : () -> 0L;
+ metrics = new TableMetrics(context, this, tableId);
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
deleted file mode 100644
index 1dfd54c..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import com.google.common.base.Preconditions;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.utils.TableReadMetrics;
-import org.apache.samza.table.utils.TableWriteMetrics;
-import org.apache.samza.util.HighResolutionClock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Base class for all readable tables
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
-
- protected final Logger logger;
-
- protected final String tableId;
-
- protected TableReadMetrics readMetrics;
- protected TableWriteMetrics writeMetrics;
-
- protected HighResolutionClock clock;
-
- /**
- * Construct an instance
- * @param tableId Id of the table
- */
- public BaseReadableTable(String tableId) {
- Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
- String.format("Invalid table Id: %s", tableId));
- this.tableId = tableId;
- this.logger = LoggerFactory.getLogger(getClass().getName() + "." + tableId);
- }
-
- @Override
- public void init(Context context) {
- MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
- clock = metricsConfig.getMetricsTimerEnabled()
- ? () -> System.nanoTime()
- : () -> 0L;
-
- readMetrics = new TableReadMetrics(context, this, tableId);
- if (this instanceof ReadWriteTable) {
- writeMetrics = new TableWriteMetrics(context, this, tableId);
- }
- }
-
- public String getTableId() {
- return tableId;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/TableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
index d3ba771..5a3777e 100644
--- a/samza-core/src/main/java/org/apache/samza/table/TableManager.java
+++ b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
@@ -54,7 +54,7 @@ public class TableManager {
static class TableCtx {
private TableProvider tableProvider;
- private Table table;
+ private ReadWriteTable table;
}
private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
@@ -110,7 +110,7 @@ public class TableManager {
* @param tableId Id of the table
* @return table instance
*/
- public Table getTable(String tableId) {
+ public ReadWriteTable getTable(String tableId) {
Preconditions.checkState(initialized, "TableManager has not been initialized.");
TableCtx ctx = tableContexts.get(tableId);
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
index e63bf61..2fde79a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
@@ -23,9 +23,8 @@ import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.utils.TableMetricsUtil;
import java.util.ArrayList;
@@ -41,34 +40,32 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
/**
- * A composite table incorporating a cache with a Samza table. The cache is
+ * A hybrid table incorporating a cache with a Samza table. The cache is
* represented as a {@link ReadWriteTable}.
*
- * The intented use case is to optimize the latency of accessing the actual table, eg.
+ * The intented use case is to optimize the latency of accessing the actual table, e.g.
* remote tables, when eventual consistency between cache and table is acceptable.
* The cache is expected to support TTL such that the values can be refreshed at some
* point.
*
- * If the actual table is read-write table, CachingTable supports both write-through
- * and write-around (writes bypassing cache) policies. For write-through policy, it
- * supports read-after-write semantics because the value is cached after written to
- * the table.
+ * {@link CachingTable} supports write-through and write-around (writes bypassing cache) policies.
+ * For write-through policy, it supports read-after-write semantics because the value is
+ * cached after written to the table.
*
- * Note that there is no synchronization in CachingTable because it is impossible to
+ * Note that there is no synchronization in {@link CachingTable} because it is impossible to
* implement a critical section between table read/write and cache update in the async
* code paths without serializing all async operations for the same keys. Given stale
- * data is a presumed trade off for using a cache for table, it should be acceptable
- * for the data in table and cache are out-of-sync. Moreover, unsynchronized operations
- * in CachingTable also deliver higher performance when there is contention.
+ * data is a presumed trade-off for using a cache with table, it should be acceptable
+ * for the data in table and cache to be temporarily out-of-sync. Moreover, unsynchronized
+ * operations in {@link CachingTable} also deliver higher performance when there is contention.
*
* @param <K> type of the table key
* @param <V> type of the table value
*/
-public class CachingTable<K, V> extends BaseReadableTable<K, V>
+public class CachingTable<K, V> extends BaseReadWriteTable<K, V>
implements ReadWriteTable<K, V> {
- private final ReadableTable<K, V> rdTable;
- private final ReadWriteTable<K, V> rwTable;
+ private final ReadWriteTable<K, V> table;
private final ReadWriteTable<K, V> cache;
private final boolean isWriteAround;
@@ -76,10 +73,9 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
private AtomicLong hitCount = new AtomicLong();
private AtomicLong missCount = new AtomicLong();
- public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
+ public CachingTable(String tableId, ReadWriteTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
super(tableId);
- this.rdTable = table;
- this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null;
+ this.table = table;
this.cache = cache;
this.isWriteAround = isWriteAround;
}
@@ -114,16 +110,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
public V get(K key) {
try {
return getAsync(key).get();
- } catch (InterruptedException e) {
- throw new SamzaException(e);
} catch (Exception e) {
- throw (SamzaException) e.getCause();
+ throw new SamzaException(e);
}
}
@Override
public CompletableFuture<V> getAsync(K key) {
- incCounter(readMetrics.numGets);
+ incCounter(metrics.numGets);
V value = cache.get(key);
if (value != null) {
hitCount.incrementAndGet();
@@ -133,14 +127,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
long startNs = clock.nanoTime();
missCount.incrementAndGet();
- return rdTable.getAsync(key).handle((result, e) -> {
+ return table.getAsync(key).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to get the record for " + key, e);
} else {
if (result != null) {
cache.put(key, result);
}
- updateTimer(readMetrics.getNs, clock.nanoTime() - startNs);
+ updateTimer(metrics.getNs, clock.nanoTime() - startNs);
return result;
}
});
@@ -150,16 +144,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
public Map<K, V> getAll(List<K> keys) {
try {
return getAllAsync(keys).get();
- } catch (InterruptedException e) {
- throw new SamzaException(e);
} catch (Exception e) {
- throw (SamzaException) e.getCause();
+ throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
- incCounter(readMetrics.numGetAlls);
+ incCounter(metrics.numGetAlls);
// Make a copy of entries which might be immutable
Map<K, V> getAllResult = new HashMap<>();
List<K> missingKeys = lookupCache(keys, getAllResult);
@@ -169,7 +161,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
}
long startNs = clock.nanoTime();
- return rdTable.getAllAsync(missingKeys).handle((records, e) -> {
+ return table.getAllAsync(missingKeys).handle((records, e) -> {
if (e != null) {
throw new SamzaException("Failed to get records for " + keys, e);
} else {
@@ -179,7 +171,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
.collect(Collectors.toList()));
getAllResult.putAll(records);
}
- updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs);
+ updateTimer(metrics.getAllNs, clock.nanoTime() - startNs);
return getAllResult;
}
});
@@ -189,20 +181,18 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
public void put(K key, V value) {
try {
putAsync(key, value).get();
- } catch (InterruptedException e) {
- throw new SamzaException(e);
} catch (Exception e) {
- throw (SamzaException) e.getCause();
+ throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> putAsync(K key, V value) {
- incCounter(writeMetrics.numPuts);
- Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
+ incCounter(metrics.numPuts);
+ Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
long startNs = clock.nanoTime();
- return rwTable.putAsync(key, value).handle((result, e) -> {
+ return table.putAsync(key, value).handle((result, e) -> {
if (e != null) {
throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e);
} else if (!isWriteAround) {
@@ -212,7 +202,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
cache.put(key, value);
}
}
- updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs);
+ updateTimer(metrics.putNs, clock.nanoTime() - startNs);
return result;
});
}
@@ -221,26 +211,24 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
public void putAll(List<Entry<K, V>> records) {
try {
putAllAsync(records).get();
- } catch (InterruptedException e) {
- throw new SamzaException(e);
} catch (Exception e) {
- throw (SamzaException) e.getCause();
+ throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
- incCounter(writeMetrics.numPutAlls);
+ incCounter(metrics.numPutAlls);
long startNs = clock.nanoTime();
- Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
- return rwTable.putAllAsync(records).handle((result, e) -> {
+ Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
+ return table.putAllAsync(records).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to put records " + records, e);
} else if (!isWriteAround) {
cache.putAll(records);
}
- updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs);
+ updateTimer(metrics.putAllNs, clock.nanoTime() - startNs);
return result;
});
}
@@ -249,25 +237,23 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
public void delete(K key) {
try {
deleteAsync(key).get();
- } catch (InterruptedException e) {
- throw new SamzaException(e);
} catch (Exception e) {
- throw (SamzaException) e.getCause();
+ throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> deleteAsync(K key) {
- incCounter(writeMetrics.numDeletes);
+ incCounter(metrics.numDeletes);
long startNs = clock.nanoTime();
- Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
- return rwTable.deleteAsync(key).handle((result, e) -> {
+ Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
+ return table.deleteAsync(key).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to delete the record for " + key, e);
} else if (!isWriteAround) {
cache.delete(key);
}
- updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs);
+ updateTimer(metrics.deleteNs, clock.nanoTime() - startNs);
return result;
});
}
@@ -283,33 +269,33 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
@Override
public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
- incCounter(writeMetrics.numDeleteAlls);
+ incCounter(metrics.numDeleteAlls);
long startNs = clock.nanoTime();
- Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
- return rwTable.deleteAllAsync(keys).handle((result, e) -> {
+ Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
+ return table.deleteAllAsync(keys).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to delete the record for " + keys, e);
} else if (!isWriteAround) {
cache.deleteAll(keys);
}
- updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs);
+ updateTimer(metrics.deleteAllNs, clock.nanoTime() - startNs);
return result;
});
}
@Override
public synchronized void flush() {
- incCounter(writeMetrics.numFlushes);
+ incCounter(metrics.numFlushes);
long startNs = clock.nanoTime();
- Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
- rwTable.flush();
- updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
+ Preconditions.checkNotNull(table, "Cannot flush a read-only table: " + table);
+ table.flush();
+ updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
}
@Override
public void close() {
- this.cache.close();
- this.rdTable.close();
+ cache.close();
+ table.close();
}
double hitRate() {
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
index d835809..e533cf4 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
@@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.caching.guava.GuavaCacheTable;
import org.apache.samza.table.BaseTableProvider;
@@ -47,18 +45,18 @@ public class CachingTableProvider extends BaseTableProvider {
}
@Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID);
- ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
+ ReadWriteTable table = this.context.getTaskContext().getTable(realTableId);
String cacheTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_TABLE_ID);
ReadWriteTable cache;
if (cacheTableId != null) {
- cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
+ cache = this.context.getTaskContext().getTable(cacheTableId);
} else {
cache = createDefaultCacheTable(realTableId, tableConfig);
defaultCaches.add(cache);
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
index b75a0bc..d8a5d9c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
@@ -23,7 +23,7 @@ import com.google.common.cache.Cache;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
* @param <K> type of the key in the cache
* @param <V> type of the value in the cache
*/
-public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V>
+public class GuavaCacheTable<K, V> extends BaseReadWriteTable<K, V>
implements ReadWriteTable<K, V> {
private final Cache<K, V> cache;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
index e45719e..042d3c7 100644
--- a/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.BaseTableProvider;
import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
import org.apache.samza.table.utils.SerdeUtils;
@@ -44,7 +44,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
}
@Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
deleted file mode 100644
index 80c2cac..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * Remote store backed read writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
- implements ReadWriteTable<K, V> {
-
- protected final TableWriteFunction<K, V> writeFn;
- protected final TableRateLimiter writeRateLimiter;
-
- public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
- TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
- ExecutorService tableExecutor, ExecutorService callbackExecutor) {
- super(tableId, readFn, readRateLimiter, tableExecutor, callbackExecutor);
- Preconditions.checkNotNull(writeFn, "null write function");
- this.writeFn = writeFn;
- this.writeRateLimiter = writeRateLimiter;
- }
-
- @Override
- public void init(Context context) {
- super.init(context);
- MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
- if (metricsConfig.getMetricsTimerEnabled()) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
- writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
- }
- }
-
- @Override
- public void put(K key, V value) {
- try {
- putAsync(key, value).get();
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAsync(K key, V value) {
- Preconditions.checkNotNull(key);
- if (value == null) {
- return deleteAsync(key);
- }
-
- return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs)
- .exceptionally(e -> {
- throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
- });
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- try {
- putAllAsync(entries).get();
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
- Preconditions.checkNotNull(records);
- if (records.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
-
- List<K> deleteKeys = records.stream()
- .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
-
- CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
- ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
-
- List<Entry<K, V>> putRecords = records.stream()
- .filter(e -> e.getValue() != null).collect(Collectors.toList());
-
- // Return the combined future
- return CompletableFuture.allOf(
- deleteFuture,
- executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs))
- .exceptionally(e -> {
- String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
- throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
- });
- }
-
- @Override
- public void delete(K key) {
- try {
- deleteAsync(key).get();
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(K key) {
- Preconditions.checkNotNull(key);
- return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs)
- .exceptionally(e -> {
- throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
- });
- }
-
- @Override
- public void deleteAll(List<K> keys) {
- try {
- deleteAllAsync(keys).get();
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @Override
- public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
- Preconditions.checkNotNull(keys);
- if (keys.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
-
- return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs)
- .exceptionally(e -> {
- throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
- });
- }
-
- @Override
- public void flush() {
- try {
- incCounter(writeMetrics.numFlushes);
- long startNs = clock.nanoTime();
- writeFn.flush();
- updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
- } catch (Exception e) {
- String errMsg = "Failed to flush remote store";
- logger.error(errMsg, e);
- throw new SamzaException(errMsg, e);
- }
- }
-
- @Override
- public void close() {
- writeFn.close();
- super.close();
- }
-
- /**
- * Execute an async request given a table record (key+value)
- * @param rateLimiter helper for rate limiting
- * @param key key of the table record
- * @param value value of the table record
- * @param method method to be executed
- * @param timer latency metric to be updated
- * @return CompletableFuture of the operation
- */
- protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
- K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
- incCounter(counter);
- final long startNs = clock.nanoTime();
- CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
- ? CompletableFuture
- .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
- .thenCompose((r) -> method.apply(key, value))
- : method.apply(key, value);
- return completeExecution(ioFuture, startNs, timer);
- }
-
- /**
- * Execute an async request given a collection of table records
- * @param rateLimiter helper for rate limiting
- * @param records list of records
- * @param method method to be executed
- * @param timer latency metric to be updated
- * @return CompletableFuture of the operation
- */
- protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
- Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
- Counter counter, Timer timer) {
- incCounter(counter);
- final long startNs = clock.nanoTime();
- CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
- ? CompletableFuture
- .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
- .thenCompose((r) -> method.apply(records))
- : method.apply(records);
- return completeExecution(ioFuture, startNs, timer);
- }
-
- @VisibleForTesting
- public TableWriteFunction<K, V> getWriteFn() {
- return writeFn;
- }
-
- @VisibleForTesting
- public TableRateLimiter getWriteRateLimiter() {
- return writeRateLimiter;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
deleted file mode 100644
index 84a05b8..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Objects;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
- * <p>
- * Many stream-processing applications require to look-up data from remote data sources eg: databases,
- * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
- * naturally modeled as a join between the incoming stream and a {@link RemoteReadableTable}.
- * <p>
- * Example use-cases include:
- * <ul>
- * <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
- * <li> Scoring page views with impressions services. </li>
- * <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
- * </ul>
- * <p>
- * A {@link RemoteReadableTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
- * which encapsulate the functionality of reading and writing data to the remote service. These provide a
- * pluggable means to specify I/O operations on the table. While the base implementation merely delegates to
- * these reader and writer functions, sub-classes of {@link RemoteReadableTable} may provide rich functionality like
- * caching or throttling on top of them.
- *
- * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter.
- * Optionally, an executor can be specified for invoking the future callbacks which otherwise are
- * executed on the threads of the underlying native data store client. This could be useful when
- * application might execute long-running operations upon future completions; another use case is to increase
- * throughput with more parallelism in the callback executions.
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
-
- protected final ExecutorService callbackExecutor;
- protected final ExecutorService tableExecutor;
- protected final TableReadFunction<K, V> readFn;
- protected final TableRateLimiter<K, V> readRateLimiter;
-
- /**
- * Construct a RemoteReadableTable instance
- * @param tableId table id
- * @param readFn {@link TableReadFunction} for read operations
- * @param rateLimiter helper for rate limiting
- * @param tableExecutor executor for issuing async requests
- * @param callbackExecutor executor for invoking async callbacks
- */
- public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn,
- TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) {
- super(tableId);
- Preconditions.checkNotNull(readFn, "null read function");
- this.readFn = readFn;
- this.readRateLimiter = rateLimiter;
- this.callbackExecutor = callbackExecutor;
- this.tableExecutor = tableExecutor;
- }
-
- @Override
- public void init(Context context) {
- super.init(context);
- MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
- if (metricsConfig.getMetricsTimerEnabled()) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
- readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
- }
- }
-
- @Override
- public V get(K key) {
- try {
- return getAsync(key).get();
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @Override
- public CompletableFuture<V> getAsync(K key) {
- Preconditions.checkNotNull(key);
- return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs)
- .handle((result, e) -> {
- if (e != null) {
- throw new SamzaException("Failed to get the records for " + key, e);
- }
- if (result == null) {
- incCounter(readMetrics.numMissedLookups);
- }
- return result;
- });
- }
-
- @Override
- public Map<K, V> getAll(List<K> keys) {
- try {
- return getAllAsync(keys).get();
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
-
- @Override
- public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
- Preconditions.checkNotNull(keys);
- if (keys.isEmpty()) {
- return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
- }
- return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs)
- .handle((result, e) -> {
- if (e != null) {
- throw new SamzaException("Failed to get the records for " + keys, e);
- }
- result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
- return result;
- });
- }
-
- /**
- * Execute an async request given a table key
- * @param rateLimiter helper for rate limiting
- * @param key key of the table record
- * @param method method to be executed
- * @param timer latency metric to be updated
- * @param <T> return type
- * @return CompletableFuture of the operation
- */
- protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
- K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
- incCounter(counter);
- final long startNs = clock.nanoTime();
- CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
- ? CompletableFuture
- .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
- .thenCompose((r) -> method.apply(key))
- : method.apply(key);
- return completeExecution(ioFuture, startNs, timer);
- }
-
- /**
- * Execute an async request given a collection of table keys
- * @param rateLimiter helper for rate limiting
- * @param keys collection of keys
- * @param method method to be executed
- * @param timer latency metric to be updated
- * @param <T> return type
- * @return CompletableFuture of the operation
- */
- protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
- Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
- incCounter(counter);
- final long startNs = clock.nanoTime();
- CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
- ? CompletableFuture
- .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
- .thenCompose((r) -> method.apply(keys))
- : method.apply(keys);
- return completeExecution(ioFuture, startNs, timer);
- }
-
- /**
- * Complete the pending execution and update timer
- * @param ioFuture the future to be executed
- * @param startNs start time in nanosecond
- * @param timer latency metric to be updated
- * @param <T> return type
- * @return CompletableFuture of the operation
- */
- protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
- if (callbackExecutor != null) {
- ioFuture.thenApplyAsync(r -> {
- updateTimer(timer, clock.nanoTime() - startNs);
- return r;
- }, callbackExecutor);
- } else {
- ioFuture.thenApply(r -> {
- updateTimer(timer, clock.nanoTime() - startNs);
- return r;
- });
- }
- return ioFuture;
- }
-
- @Override
- public void close() {
- readFn.close();
- }
-
- @VisibleForTesting
- public ExecutorService getCallbackExecutor() {
- return callbackExecutor;
- }
-
- @VisibleForTesting
- public ExecutorService getTableExecutor() {
- return tableExecutor;
- }
-
- @VisibleForTesting
- public TableReadFunction<K, V> getReadFn() {
- return readFn;
- }
-
- @VisibleForTesting
- public TableRateLimiter<K, V> getReadRateLimiter() {
- return readRateLimiter;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
new file mode 100644
index 0000000..5b9b289
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A Samza {@link ReadWriteTable} backed by a remote data-store or service.
+ * <p>
+ * Many stream-processing applications require to look-up data from remote data sources eg: databases,
+ * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
+ * naturally modeled as a join between the incoming stream and a table.
+ * <p>
+ * Example use-cases include:
+ * <ul>
+ * <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
+ * <li> Scoring page views with impressions services. </li>
+ * <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
+ * </ul>
+ * <p>
+ * A {@link RemoteTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
+ * which encapsulate the functionality of reading and writing data to the remote service. These provide a
+ * pluggable means to specify I/O operations on the table.
+ *
+ * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter.
+ * Optionally, an executor can be specified for invoking the future callbacks which otherwise are
+ * executed on the threads of the underlying native data store client. This could be useful when
+ * application might execute long-running operations upon future completions; another use case is to increase
+ * throughput with more parallelism in the callback executions.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
+ implements ReadWriteTable<K, V> {
+
+ protected final ExecutorService callbackExecutor;
+ protected final ExecutorService tableExecutor;
+ protected final TableReadFunction<K, V> readFn;
+ protected final TableWriteFunction<K, V> writeFn;
+ protected final TableRateLimiter<K, V> readRateLimiter;
+ protected final TableRateLimiter writeRateLimiter;
+
+ /**
+ * Construct a RemoteTable instance
+ * @param tableId table id
+ * @param readFn {@link TableReadFunction} for read operations
+ * @param writeFn {@link TableWriteFunction} for read operations
+ * @param readRateLimiter helper for read rate limiting
+ * @param writeRateLimiter helper for write rate limiting
+ * @param tableExecutor executor for issuing async requests
+ * @param callbackExecutor executor for invoking async callbacks
+ */
+ public RemoteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
+ TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
+ ExecutorService tableExecutor, ExecutorService callbackExecutor) {
+ super(tableId);
+ Preconditions.checkNotNull(readFn, "null read function");
+ this.readFn = readFn;
+ this.writeFn = writeFn;
+ this.readRateLimiter = readRateLimiter;
+ this.writeRateLimiter = writeRateLimiter;
+ this.tableExecutor = tableExecutor;
+ this.callbackExecutor = callbackExecutor;
+ }
+
+ @Override
+ public void init(Context context) {
+ super.init(context);
+ MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
+ if (metricsConfig.getMetricsTimerEnabled()) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
+ readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+ if (writeRateLimiter != null) {
+ writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+ }
+ }
+ }
+
+ @Override
+ public V get(K key) {
+ try {
+ return getAsync(key).get();
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<V> getAsync(K key) {
+ Preconditions.checkNotNull(key);
+ return execute(readRateLimiter, key, readFn::getAsync, metrics.numGets, metrics.getNs)
+ .handle((result, e) -> {
+ if (e != null) {
+ throw new SamzaException("Failed to get the records for " + key, e);
+ }
+ if (result == null) {
+ incCounter(metrics.numMissedLookups);
+ }
+ return result;
+ });
+ }
+
+ @Override
+ public Map<K, V> getAll(List<K> keys) {
+ try {
+ return getAllAsync(keys).get();
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+ Preconditions.checkNotNull(keys);
+ if (keys.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+ }
+ return execute(readRateLimiter, keys, readFn::getAllAsync, metrics.numGetAlls, metrics.getAllNs)
+ .handle((result, e) -> {
+ if (e != null) {
+ throw new SamzaException("Failed to get the records for " + keys, e);
+ }
+ result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+ return result;
+ });
+ }
+
+ @Override
+ public void put(K key, V value) {
+ try {
+ putAsync(key, value).get();
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(K key, V value) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(key);
+ if (value == null) {
+ return deleteAsync(key);
+ }
+
+ return execute(writeRateLimiter, key, value, writeFn::putAsync, metrics.numPuts, metrics.putNs)
+ .exceptionally(e -> {
+ throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
+ });
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ try {
+ putAllAsync(entries).get();
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(records);
+ if (records.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ List<K> deleteKeys = records.stream()
+ .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
+
+ CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
+ ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
+
+ List<Entry<K, V>> putRecords = records.stream()
+ .filter(e -> e.getValue() != null).collect(Collectors.toList());
+
+ // Return the combined future
+ return CompletableFuture.allOf(
+ deleteFuture,
+ executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, metrics.numPutAlls, metrics.putAllNs))
+ .exceptionally(e -> {
+ String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
+ throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
+ });
+ }
+
+ @Override
+ public void delete(K key) {
+ try {
+ deleteAsync(key).get();
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(K key) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(key);
+ return execute(writeRateLimiter, key, writeFn::deleteAsync, metrics.numDeletes, metrics.deleteNs)
+ .exceptionally(e -> {
+ throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
+ });
+ }
+
+ @Override
+ public void deleteAll(List<K> keys) {
+ try {
+ deleteAllAsync(keys).get();
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+ Preconditions.checkNotNull(writeFn, "null write function");
+ Preconditions.checkNotNull(keys);
+ if (keys.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, metrics.numDeleteAlls, metrics.deleteAllNs)
+ .exceptionally(e -> {
+ throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
+ });
+ }
+
+ @Override
+ public void flush() {
+ if (writeFn != null) {
+ try {
+ incCounter(metrics.numFlushes);
+ long startNs = clock.nanoTime();
+ writeFn.flush();
+ updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
+ } catch (Exception e) {
+ String errMsg = "Failed to flush remote store";
+ logger.error(errMsg, e);
+ throw new SamzaException(errMsg, e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ readFn.close();
+ if (writeFn != null) {
+ writeFn.close();
+ }
+ }
+
+ /**
+ * Execute an async request given a table key
+ * @param rateLimiter helper for rate limiting
+ * @param key key of the table record
+ * @param method method to be executed
+ * @param counter count metric to be updated
+ * @param timer latency metric to be updated
+ * @param <T> return type
+ * @return CompletableFuture of the operation
+ */
+ protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
+ K key, Function<K, CompletableFuture<T>> method, Counter counter, Timer timer) {
+ incCounter(counter);
+ final long startNs = clock.nanoTime();
+ CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
+ .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
+ .thenCompose((r) -> method.apply(key))
+ : method.apply(key);
+ return completeExecution(ioFuture, startNs, timer);
+ }
+
+ /**
+ * Execute an async request given a collection of table keys
+ * @param rateLimiter helper for rate limiting
+ * @param keys collection of keys
+ * @param method method to be executed
+ * @param counter count metric to be updated
+ * @param timer latency metric to be updated
+ * @param <T> return type
+ * @return CompletableFuture of the operation
+ */
+ protected <T> CompletableFuture<T> execute(TableRateLimiter<K, V> rateLimiter,
+ Collection<K> keys, Function<Collection<K>, CompletableFuture<T>> method, Counter counter, Timer timer) {
+ incCounter(counter);
+ final long startNs = clock.nanoTime();
+ CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
+ .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
+ .thenCompose((r) -> method.apply(keys))
+ : method.apply(keys);
+ return completeExecution(ioFuture, startNs, timer);
+ }
+
+ /**
+ * Complete the pending execution and update timer
+ * @param ioFuture the future to be executed
+ * @param startNs start time in nanosecond
+ * @param timer latency metric to be updated
+ * @param <T> return type
+ * @return CompletableFuture of the operation
+ */
+ protected <T> CompletableFuture<T> completeExecution(CompletableFuture<T> ioFuture, long startNs, Timer timer) {
+ if (callbackExecutor != null) {
+ ioFuture.thenApplyAsync(r -> {
+ updateTimer(timer, clock.nanoTime() - startNs);
+ return r;
+ }, callbackExecutor);
+ } else {
+ ioFuture.thenApply(r -> {
+ updateTimer(timer, clock.nanoTime() - startNs);
+ return r;
+ });
+ }
+ return ioFuture;
+ }
+
+ /**
+ * Execute an async request given a table record (key+value)
+ * @param rateLimiter helper for rate limiting
+ * @param key key of the table record
+ * @param value value of the table record
+ * @param method method to be executed
+ * @param counter count metric to be updated
+ * @param timer latency metric to be updated
+ * @return CompletableFuture of the operation
+ */
+ protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
+ K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
+ incCounter(counter);
+ final long startNs = clock.nanoTime();
+ CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
+ .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
+ .thenCompose((r) -> method.apply(key, value))
+ : method.apply(key, value);
+ return completeExecution(ioFuture, startNs, timer);
+ }
+
+ /**
+ * Execute an async request given a collection of table records
+ * @param rateLimiter helper for rate limiting
+ * @param records list of records
+ * @param method method to be executed
+ * @param counter count metric to be updated
+ * @param timer latency metric to be updated
+ * @return CompletableFuture of the operation
+ */
+ protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
+ Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
+ Counter counter, Timer timer) {
+ incCounter(counter);
+ final long startNs = clock.nanoTime();
+ CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+ ? CompletableFuture
+ .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
+ .thenCompose((r) -> method.apply(records))
+ : method.apply(records);
+ return completeExecution(ioFuture, startNs, timer);
+ }
+
+ @VisibleForTesting
+ public ExecutorService getCallbackExecutor() {
+ return callbackExecutor;
+ }
+
+ @VisibleForTesting
+ public ExecutorService getTableExecutor() {
+ return tableExecutor;
+ }
+
+ @VisibleForTesting
+ public TableReadFunction<K, V> getReadFn() {
+ return readFn;
+ }
+
+ @VisibleForTesting
+ public TableRateLimiter<K, V> getReadRateLimiter() {
+ return readRateLimiter;
+ }
+
+ @VisibleForTesting
+ public TableWriteFunction<K, V> getWriteFn() {
+ return writeFn;
+ }
+
+ @VisibleForTesting
+ public TableRateLimiter getWriteRateLimiter() {
+ return writeRateLimiter;
+ }
+}
[3/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and
ReadWriteTable
Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 93a0521..8b6bc1a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -21,8 +21,7 @@ package org.apache.samza.table.remote;
import com.google.common.base.Preconditions;
import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -45,9 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class RemoteTableProvider extends BaseTableProvider {
- private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
-
- private boolean readOnly;
+ private final List<RemoteTable<?, ?>> tables = new ArrayList<>();
/**
* Map of tableId -> executor service for async table IO and callbacks. The same executors
@@ -63,21 +60,13 @@ public class RemoteTableProvider extends BaseTableProvider {
}
@Override
- public void init(Context context) {
- super.init(context);
- JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
- this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
- }
-
- @Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
- RemoteReadableTable table;
-
JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
+ // Read part
TableReadFunction readFn = getReadFn(tableConfig);
RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
if (rateLimiter != null) {
@@ -86,34 +75,29 @@ public class RemoteTableProvider extends BaseTableProvider {
TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
- TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
- TableRateLimiter writeRateLimiter = null;
-
TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
- TableRetryPolicy writeRetryPolicy = null;
-
- if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
- retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
- Thread thread = new Thread(runnable);
- thread.setName("table-retry-executor");
- thread.setDaemon(true);
- return thread;
- });
- }
-
if (readRetryPolicy != null) {
+ if (retryExecutor == null) {
+ retryExecutor = createRetryExecutor();
+ }
readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
}
- TableWriteFunction writeFn = getWriteFn(tableConfig);
-
boolean isRateLimited = readRateLimiter.isRateLimited();
- if (!readOnly) {
- writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+
+ // Write part
+ TableWriteFunction writeFn = getWriteFn(tableConfig);
+ TableRateLimiter writeRateLimiter = null;
+ TableRetryPolicy writeRetryPolicy = null;
+ if (writeFn != null) {
+ TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
isRateLimited |= writeRateLimiter.isRateLimited();
writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
if (writeRetryPolicy != null) {
+ if (retryExecutor == null) {
+ retryExecutor = createRetryExecutor();
+ }
writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
}
}
@@ -140,13 +124,8 @@ public class RemoteTableProvider extends BaseTableProvider {
}));
}
- if (readOnly) {
- table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
- tableExecutors.get(tableId), callbackExecutors.get(tableId));
- } else {
- table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
- writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
- }
+ RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter,
+ writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
if (readRetryPolicy != null) {
@@ -192,5 +171,14 @@ public class RemoteTableProvider extends BaseTableProvider {
}
return writeFn;
}
+
+ private ScheduledExecutorService createRetryExecutor() {
+ return Executors.newSingleThreadScheduledExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-retry-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
new file mode 100644
index 0000000..df6833e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableMetrics {
+
+ // Read metrics
+ public final Timer getNs;
+ public final Timer getAllNs;
+ public final Counter numGets;
+ public final Counter numGetAlls;
+ public final Counter numMissedLookups;
+ // Write metrics
+ public final Counter numPuts;
+ public final Timer putNs;
+ public final Counter numPutAlls;
+ public final Timer putAllNs;
+ public final Counter numDeletes;
+ public final Timer deleteNs;
+ public final Counter numDeleteAlls;
+ public final Timer deleteAllNs;
+ public final Counter numFlushes;
+ public final Timer flushNs;
+
+ /**
+ * Constructor based on container and task container context
+ *
+ * @param context {@link Context} for this task
+ * @param table underlying table
+ * @param tableId table Id
+ */
+ public TableMetrics(Context context, Table table, String tableId) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+ // Read metrics
+ numGets = tableMetricsUtil.newCounter("num-gets");
+ getNs = tableMetricsUtil.newTimer("get-ns");
+ numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+ getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+ numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+ // Write metrics
+ numPuts = tableMetricsUtil.newCounter("num-puts");
+ putNs = tableMetricsUtil.newTimer("put-ns");
+ numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+ putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+ numDeletes = tableMetricsUtil.newCounter("num-deletes");
+ deleteNs = tableMetricsUtil.newTimer("delete-ns");
+ numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+ deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+ numFlushes = tableMetricsUtil.newCounter("num-flushes");
+ flushNs = tableMetricsUtil.newTimer("flush-ns");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
deleted file mode 100644
index e77fcfd..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class TableReadMetrics {
-
- public final Timer getNs;
- public final Timer getAllNs;
- public final Counter numGets;
- public final Counter numGetAlls;
- public final Counter numMissedLookups;
-
- /**
- * Constructor based on container and task container context
- *
- * @param context {@link Context} for this task
- * @param table underlying table
- * @param tableId table Id
- */
- public TableReadMetrics(Context context, Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
- numGets = tableMetricsUtil.newCounter("num-gets");
- getNs = tableMetricsUtil.newTimer("get-ns");
- numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
- getAllNs = tableMetricsUtil.newTimer("getAll-ns");
- numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
deleted file mode 100644
index bf65b74..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-public class TableWriteMetrics {
-
- public final Counter numPuts;
- public final Timer putNs;
- public final Counter numPutAlls;
- public final Timer putAllNs;
- public final Counter numDeletes;
- public final Timer deleteNs;
- public final Counter numDeleteAlls;
- public final Timer deleteAllNs;
- public final Counter numFlushes;
- public final Timer flushNs;
-
- /**
- * Utility class that contains the default set of write metrics.
- *
- * @param context {@link Context} for this task
- * @param table underlying table
- * @param tableId table Id
- */
- public TableWriteMetrics(Context context, Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
- numPuts = tableMetricsUtil.newCounter("num-puts");
- putNs = tableMetricsUtil.newTimer("put-ns");
- numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
- putAllNs = tableMetricsUtil.newTimer("putAll-ns");
- numDeletes = tableMetricsUtil.newCounter("num-deletes");
- deleteNs = tableMetricsUtil.newTimer("delete-ns");
- numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
- deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
- numFlushes = tableMetricsUtil.newCounter("num-flushes");
- flushNs = tableMetricsUtil.newTimer("flush-ns");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 4112c8b..8fd161b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.operators.KV;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -71,7 +71,7 @@ public class TestStreamTableJoinOperatorImpl {
return record.getKey();
}
});
- ReadableTable table = mock(ReadableTable.class);
+ ReadWriteTable table = mock(ReadWriteTable.class);
when(table.get("1")).thenReturn("r1");
when(table.get("2")).thenReturn(null);
Context context = new MockContext();
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index a3b1963..e60b6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -49,12 +49,12 @@ public class TestTableManager {
public static class DummyTableProviderFactory implements TableProviderFactory {
- static ReadableTable table;
+ static ReadWriteTable table;
static TableProvider tableProvider;
@Override
public TableProvider getTableProvider(String tableId) {
- table = mock(ReadableTable.class);
+ table = mock(ReadWriteTable.class);
tableProvider = mock(TableProvider.class);
when(tableProvider.getTable()).thenReturn(table);
return tableProvider;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 5a19767..c304bfd 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -35,11 +35,10 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.caching.guava.GuavaCacheTable;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
@@ -138,11 +137,11 @@ public class TestCachingTable {
return Pair.of(cacheTable, cacheStore);
}
- private void initTables(ReadableTable ... tables) {
+ private void initTables(ReadWriteTable ... tables) {
initTables(false, tables);
}
- private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+ private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) {
Map<String, String> config = new HashMap<>();
if (isTimerMetricsDisabled) {
config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -242,7 +241,7 @@ public class TestCachingTable {
@Test
public void testNonexistentKeyInTable() {
- ReadableTable<String, String> table = mock(ReadableTable.class);
+ ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
doReturn(CompletableFuture.completedFuture(null)).when(table).getAsync(any());
ReadWriteTable<String, String> cache = getMockCache().getLeft();
CachingTable<String, String> cachingTable = new CachingTable<>("myTable", table, cache, false);
@@ -255,7 +254,7 @@ public class TestCachingTable {
@Test
public void testKeyEviction() {
- ReadableTable<String, String> table = mock(ReadableTable.class);
+ ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
doReturn(CompletableFuture.completedFuture("3")).when(table).getAsync(any());
ReadWriteTable<String, String> cache = mock(ReadWriteTable.class);
@@ -283,7 +282,7 @@ public class TestCachingTable {
TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+ final RemoteTable<String, String> remoteTable = new RemoteTable<>(
tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
@@ -402,7 +401,7 @@ public class TestCachingTable {
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
- final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+ final RemoteTable<String, String> remoteTable = new RemoteTable<>(
tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
index 486295a..a764a8b 100644
--- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
@@ -30,7 +30,7 @@ import org.apache.samza.context.Context;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
import org.junit.Test;
@@ -143,7 +143,7 @@ public class TestLocalTableDescriptor {
}
@Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
throw new SamzaException("Not implemented");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
deleted file mode 100644
index d7733a8..0000000
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestRemoteReadWriteTable {
- private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
-
- public static Context getMockContext() {
- Context context = new MockContext();
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
- doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
- doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
- doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
- doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
- return context;
- }
-
- private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
- TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
- return getTable(tableId, readFn, writeFn, null);
- }
-
- private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
- TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
- RemoteReadableTable<K, V> table;
-
- TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
- TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
- doReturn(true).when(readRateLimiter).isRateLimited();
- doReturn(true).when(writeRateLimiter).isRateLimited();
-
- ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
-
- if (writeFn == null) {
- table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
- } else {
- table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
- }
-
- Context context = getMockContext();
-
- table.init(context);
-
- return (T) table;
- }
-
- private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
- String tableId = "testGet-" + sync + error + retry;
- TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
- // Sync is backed by async so needs to mock the async method
- CompletableFuture<String> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- if (!retry) {
- doReturn(future).when(readFn).getAsync(anyString());
- } else {
- final int [] times = new int[] {0};
- doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
- .when(readFn).getAsync(anyString());
- }
- } else {
- future = CompletableFuture.completedFuture("bar");
- doReturn(future).when(readFn).getAsync(anyString());
- }
- if (retry) {
- doReturn(true).when(readFn).isRetriable(any());
- TableRetryPolicy policy = new TableRetryPolicy();
- readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
- }
- RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
- Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
- verify(table.readRateLimiter, times(1)).throttle(anyString());
- }
-
- @Test
- public void testGet() throws Exception {
- doTestGet(true, false, false);
- }
-
- @Test
- public void testGetAsync() throws Exception {
- doTestGet(false, false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testGetAsyncError() throws Exception {
- doTestGet(false, true, false);
- }
-
- @Test
- public void testGetAsyncErrorRetried() throws Exception {
- doTestGet(false, true, true);
- }
-
- @Test
- public void testGetMultipleTables() {
- TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
- TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
-
- // Sync is backed by async so needs to mock the async method
- doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
- doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
-
- RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
- RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
-
- CompletableFuture<String> future1 = table1.getAsync("foo1");
- CompletableFuture<String> future2 = table2.getAsync("foo2");
-
- CompletableFuture.allOf(future1, future2)
- .thenAccept(u -> {
- Assert.assertEquals(future1.join(), "bar1");
- Assert.assertEquals(future2.join(), "bar1");
- });
- }
-
- private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
- String tableId = "testPut-" + sync + error + isDelete + retry;
- TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
- TableWriteFunction<String, String> writeFn = mockWriteFn;
- CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
- CompletableFuture<Void> failureFuture = new CompletableFuture();
- failureFuture.completeExceptionally(new RuntimeException("Test exception"));
- if (!error) {
- if (isDelete) {
- doReturn(successFuture).when(writeFn).deleteAsync(any());
- } else {
- doReturn(successFuture).when(writeFn).putAsync(any(), any());
- }
- } else if (!retry) {
- if (isDelete) {
- doReturn(failureFuture).when(writeFn).deleteAsync(any());
- } else {
- doReturn(failureFuture).when(writeFn).putAsync(any(), any());
- }
- } else {
- doReturn(true).when(writeFn).isRetriable(any());
- final int [] times = new int[] {0};
- if (isDelete) {
- doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
- } else {
- doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
- }
- writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
- }
- RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
- if (sync) {
- table.put("foo", isDelete ? null : "bar");
- } else {
- table.putAsync("foo", isDelete ? null : "bar").get();
- }
- ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
- ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
- if (isDelete) {
- verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
- } else {
- verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
- Assert.assertEquals("bar", valCaptor.getValue());
- }
- Assert.assertEquals("foo", keyCaptor.getValue());
- if (isDelete) {
- verify(table.writeRateLimiter, times(1)).throttle(anyString());
- } else {
- verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
- }
- }
-
- @Test
- public void testPut() throws Exception {
- doTestPut(true, false, false, false);
- }
-
- @Test
- public void testPutDelete() throws Exception {
- doTestPut(true, false, true, false);
- }
-
- @Test
- public void testPutAsync() throws Exception {
- doTestPut(false, false, false, false);
- }
-
- @Test
- public void testPutAsyncDelete() throws Exception {
- doTestPut(false, false, true, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testPutAsyncError() throws Exception {
- doTestPut(false, true, false, false);
- }
-
- @Test
- public void testPutAsyncErrorRetried() throws Exception {
- doTestPut(false, true, false, true);
- }
-
- private void doTestDelete(boolean sync, boolean error) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(writeFn).deleteAsync(any());
- ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
- if (sync) {
- table.delete("foo");
- } else {
- table.deleteAsync("foo").get();
- }
- verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
- Assert.assertEquals("foo", argCaptor.getValue());
- verify(table.writeRateLimiter, times(1)).throttle(anyString());
- }
-
- @Test
- public void testDelete() throws Exception {
- doTestDelete(true, false);
- }
-
- @Test
- public void testDeleteAsync() throws Exception {
- doTestDelete(false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testDeleteAsyncError() throws Exception {
- doTestDelete(false, true);
- }
-
- private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
- TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
- Map<String, String> res = new HashMap<>();
- res.put("foo1", "bar1");
- if (!partial) {
- res.put("foo2", "bar2");
- }
- CompletableFuture<Map<String, String>> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(res);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(readFn).getAllAsync(any());
- RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
- Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
- : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
- verify(table.readRateLimiter, times(1)).throttle(anyCollection());
- }
-
- @Test
- public void testGetAll() throws Exception {
- doTestGetAll(true, false, false);
- }
-
- @Test
- public void testGetAllAsync() throws Exception {
- doTestGetAll(false, false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testGetAllAsyncError() throws Exception {
- doTestGetAll(false, true, false);
- }
-
- // Partial result is an acceptable scenario
- @Test
- public void testGetAllPartialResult() throws Exception {
- doTestGetAll(false, false, true);
- }
-
- public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(writeFn).putAllAsync(any());
- if (hasDelete) {
- doReturn(future).when(writeFn).deleteAllAsync(any());
- }
- List<Entry<String, String>> entries = Arrays.asList(
- new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
- ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
- if (sync) {
- table.putAll(entries);
- } else {
- table.putAllAsync(entries).get();
- }
- verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
- if (hasDelete) {
- ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
- verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
- Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
- Assert.assertEquals(1, argCaptor.getValue().size());
- Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
- verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
- } else {
- Assert.assertEquals(entries, argCaptor.getValue());
- }
- verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
- }
-
- @Test
- public void testPutAll() throws Exception {
- doTestPutAll(true, false, false);
- }
-
- @Test
- public void testPutAllHasDelete() throws Exception {
- doTestPutAll(true, false, true);
- }
-
- @Test
- public void testPutAllAsync() throws Exception {
- doTestPutAll(false, false, false);
- }
-
- @Test
- public void testPutAllAsyncHasDelete() throws Exception {
- doTestPutAll(false, false, true);
- }
-
- @Test(expected = ExecutionException.class)
- public void testPutAllAsyncError() throws Exception {
- doTestPutAll(false, true, false);
- }
-
- public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(writeFn).deleteAllAsync(any());
- List<String> keys = Arrays.asList("foo1", "foo2");
- ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
- if (sync) {
- table.deleteAll(keys);
- } else {
- table.deleteAllAsync(keys).get();
- }
- verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
- Assert.assertEquals(keys, argCaptor.getValue());
- verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
- }
-
- @Test
- public void testDeleteAll() throws Exception {
- doTestDeleteAll(true, false);
- }
-
- @Test
- public void testDeleteAllAsync() throws Exception {
- doTestDeleteAll(false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testDeleteAllAsyncError() throws Exception {
- doTestDeleteAll(false, true);
- }
-
- @Test
- public void testFlush() {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
- table.flush();
- verify(writeFn, times(1)).flush();
- }
-
- @Test
- public void testGetWithCallbackExecutor() throws Exception {
- TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
- // Sync is backed by async so needs to mock the async method
- doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
- RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
- Executors.newSingleThreadExecutor());
- Thread testThread = Thread.currentThread();
-
- table.getAsync("foo").thenAccept(result -> {
- Assert.assertEquals("bar", result);
- // Must be executed on the executor thread
- Assert.assertNotSame(testThread, Thread.currentThread());
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
new file mode 100644
index 0000000..ae96d86
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteTable {
+ private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+ public static Context getMockContext() {
+ Context context = new MockContext();
+ MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+ doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+ doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+ doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+ return context;
+ }
+
+ private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+ TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
+ return getTable(tableId, readFn, writeFn, null);
+ }
+
+ private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+ TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
+ RemoteTable<K, V> table;
+
+ TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
+ TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
+ doReturn(true).when(readRateLimiter).isRateLimited();
+ doReturn(true).when(writeRateLimiter).isRateLimited();
+
+ ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
+
+ table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
+
+ Context context = getMockContext();
+
+ table.init(context);
+
+ return (T) table;
+ }
+
+ private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+ String tableId = "testGet-" + sync + error + retry;
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ // Sync is backed by async so needs to mock the async method
+ CompletableFuture<String> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ if (!retry) {
+ doReturn(future).when(readFn).getAsync(anyString());
+ } else {
+ final int [] times = new int[] {0};
+ doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+ .when(readFn).getAsync(anyString());
+ }
+ } else {
+ future = CompletableFuture.completedFuture("bar");
+ doReturn(future).when(readFn).getAsync(anyString());
+ }
+ if (retry) {
+ doReturn(true).when(readFn).isRetriable(any());
+ TableRetryPolicy policy = new TableRetryPolicy();
+ readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+ }
+ RemoteTable<String, String> table = getTable(tableId, readFn, null);
+ Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
+ verify(table.readRateLimiter, times(1)).throttle(anyString());
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ doTestGet(true, false, false);
+ }
+
+ @Test
+ public void testGetAsync() throws Exception {
+ doTestGet(false, false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testGetAsyncError() throws Exception {
+ doTestGet(false, true, false);
+ }
+
+ @Test
+ public void testGetAsyncErrorRetried() throws Exception {
+ doTestGet(false, true, true);
+ }
+
+ @Test
+ public void testGetMultipleTables() {
+ TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
+ TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
+
+ // Sync is backed by async so needs to mock the async method
+ doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
+ doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
+
+ RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
+ RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+
+ CompletableFuture<String> future1 = table1.getAsync("foo1");
+ CompletableFuture<String> future2 = table2.getAsync("foo2");
+
+ CompletableFuture.allOf(future1, future2)
+ .thenAccept(u -> {
+ Assert.assertEquals(future1.join(), "bar1");
+ Assert.assertEquals(future2.join(), "bar1");
+ });
+ }
+
+ private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+ String tableId = "testPut-" + sync + error + isDelete + retry;
+ TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+ TableWriteFunction<String, String> writeFn = mockWriteFn;
+ CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+ CompletableFuture<Void> failureFuture = new CompletableFuture();
+ failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+ if (!error) {
+ if (isDelete) {
+ doReturn(successFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doReturn(successFuture).when(writeFn).putAsync(any(), any());
+ }
+ } else if (!retry) {
+ if (isDelete) {
+ doReturn(failureFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+ }
+ } else {
+ doReturn(true).when(writeFn).isRetriable(any());
+ final int [] times = new int[] {0};
+ if (isDelete) {
+ doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+ }
+ writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
+ }
+ RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+ if (sync) {
+ table.put("foo", isDelete ? null : "bar");
+ } else {
+ table.putAsync("foo", isDelete ? null : "bar").get();
+ }
+ ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
+ if (isDelete) {
+ verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
+ } else {
+ verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+ Assert.assertEquals("bar", valCaptor.getValue());
+ }
+ Assert.assertEquals("foo", keyCaptor.getValue());
+ if (isDelete) {
+ verify(table.writeRateLimiter, times(1)).throttle(anyString());
+ } else {
+ verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
+ }
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ doTestPut(true, false, false, false);
+ }
+
+ @Test
+ public void testPutDelete() throws Exception {
+ doTestPut(true, false, true, false);
+ }
+
+ @Test
+ public void testPutAsync() throws Exception {
+ doTestPut(false, false, false, false);
+ }
+
+ @Test
+ public void testPutAsyncDelete() throws Exception {
+ doTestPut(false, false, true, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testPutAsyncError() throws Exception {
+ doTestPut(false, true, false, false);
+ }
+
+ @Test
+ public void testPutAsyncErrorRetried() throws Exception {
+ doTestPut(false, true, false, true);
+ }
+
+ private void doTestDelete(boolean sync, boolean error) throws Exception {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
+ mock(TableReadFunction.class), writeFn);
+ CompletableFuture<Void> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(writeFn).deleteAsync(any());
+ ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
+ if (sync) {
+ table.delete("foo");
+ } else {
+ table.deleteAsync("foo").get();
+ }
+ verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
+ Assert.assertEquals("foo", argCaptor.getValue());
+ verify(table.writeRateLimiter, times(1)).throttle(anyString());
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ doTestDelete(true, false);
+ }
+
+ @Test
+ public void testDeleteAsync() throws Exception {
+ doTestDelete(false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testDeleteAsyncError() throws Exception {
+ doTestDelete(false, true);
+ }
+
+ private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ Map<String, String> res = new HashMap<>();
+ res.put("foo1", "bar1");
+ if (!partial) {
+ res.put("foo2", "bar2");
+ }
+ CompletableFuture<Map<String, String>> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(res);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(readFn).getAllAsync(any());
+ RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+ Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
+ : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+ verify(table.readRateLimiter, times(1)).throttle(anyCollection());
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ doTestGetAll(true, false, false);
+ }
+
+ @Test
+ public void testGetAllAsync() throws Exception {
+ doTestGetAll(false, false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testGetAllAsyncError() throws Exception {
+ doTestGetAll(false, true, false);
+ }
+
+ // Partial result is an acceptable scenario
+ @Test
+ public void testGetAllPartialResult() throws Exception {
+ doTestGetAll(false, false, true);
+ }
+
+ public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
+ mock(TableReadFunction.class), writeFn);
+ CompletableFuture<Void> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(writeFn).putAllAsync(any());
+ if (hasDelete) {
+ doReturn(future).when(writeFn).deleteAllAsync(any());
+ }
+ List<Entry<String, String>> entries = Arrays.asList(
+ new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
+ ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+ if (sync) {
+ table.putAll(entries);
+ } else {
+ table.putAllAsync(entries).get();
+ }
+ verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
+ if (hasDelete) {
+ ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
+ verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
+ Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
+ Assert.assertEquals(1, argCaptor.getValue().size());
+ Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
+ verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+ } else {
+ Assert.assertEquals(entries, argCaptor.getValue());
+ }
+ verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
+ }
+
+ @Test
+ public void testPutAll() throws Exception {
+ doTestPutAll(true, false, false);
+ }
+
+ @Test
+ public void testPutAllHasDelete() throws Exception {
+ doTestPutAll(true, false, true);
+ }
+
+ @Test
+ public void testPutAllAsync() throws Exception {
+ doTestPutAll(false, false, false);
+ }
+
+ @Test
+ public void testPutAllAsyncHasDelete() throws Exception {
+ doTestPutAll(false, false, true);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testPutAllAsyncError() throws Exception {
+ doTestPutAll(false, true, false);
+ }
+
+ public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
+ mock(TableReadFunction.class), writeFn);
+ CompletableFuture<Void> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(writeFn).deleteAllAsync(any());
+ List<String> keys = Arrays.asList("foo1", "foo2");
+ ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+ if (sync) {
+ table.deleteAll(keys);
+ } else {
+ table.deleteAllAsync(keys).get();
+ }
+ verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
+ Assert.assertEquals(keys, argCaptor.getValue());
+ verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+ }
+
+ @Test
+ public void testDeleteAll() throws Exception {
+ doTestDeleteAll(true, false);
+ }
+
+ @Test
+ public void testDeleteAllAsync() throws Exception {
+ doTestDeleteAll(false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testDeleteAllAsyncError() throws Exception {
+ doTestDeleteAll(false, true);
+ }
+
+ @Test
+ public void testFlush() {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+ table.flush();
+ verify(writeFn, times(1)).flush();
+ }
+
+ @Test
+ public void testGetWithCallbackExecutor() throws Exception {
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ // Sync is backed by async so needs to mock the async method
+ doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+ RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
+ Executors.newSingleThreadExecutor());
+ Thread testThread = Thread.currentThread();
+
+ table.getAsync("foo").thenAccept(result -> {
+ Assert.assertEquals("bar", result);
+ // Must be executed on the executor thread
+ Assert.assertNotSame(testThread, Thread.currentThread());
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 3d8e36f..907242f 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -36,17 +36,17 @@ import org.apache.samza.metrics.Timer;
import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.table.remote.RemoteTableProvider;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
+
import org.junit.Assert;
import org.junit.Test;
@@ -207,8 +207,8 @@ public class TestRemoteTableDescriptor {
RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId());
provider.init(createMockContext(desc));
Table table = provider.getTable();
- Assert.assertTrue(table instanceof RemoteReadWriteTable);
- RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+ Assert.assertTrue(table instanceof RemoteTable);
+ RemoteTable rwTable = (RemoteTable) table;
if (numRateLimitOps > 0) {
Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null);
Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null);
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 5116cab..050ea55 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.Table;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.TestRemoteReadWriteTable;
+import org.apache.samza.table.remote.TestRemoteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
import org.junit.Test;
@@ -54,7 +54,7 @@ public class TestRetriableTableFunctions {
public TableMetricsUtil getMetricsUtil(String tableId) {
Table table = mock(Table.class);
- Context context = TestRemoteReadWriteTable.getMockContext();
+ Context context = TestRemoteTable.getMockContext();
return new TableMetricsUtil(context, table, tableId);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
index ddb79ba..fc9ce76 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.samza.storage.kv.inmemory;
import java.util.Map;
-import junit.framework.Assert;
+
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
@@ -28,7 +28,9 @@ import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.LocalTableProviderFactory;
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+
import org.junit.Test;
+import org.junit.Assert;
public class TestInMemoryTableDescriptor {
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index 62fb3da..319fb0f 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.samza.storage.kv.descriptors;
import java.util.Map;
-import junit.framework.Assert;
+
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
@@ -30,7 +30,9 @@ import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.LocalTableProviderFactory;
import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+
import org.junit.Test;
+import org.junit.Assert;
public class TestRocksDbTableDescriptor {
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
deleted file mode 100644
index eae6bb0..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * A store backed readable and writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
- implements ReadWriteTable<K, V> {
-
- /**
- * Constructs an instance of {@link LocalReadWriteTable}
- * @param tableId the table Id
- * @param kvStore the backing store
- */
- public LocalReadWriteTable(String tableId, KeyValueStore kvStore) {
- super(tableId, kvStore);
- }
-
- @Override
- public void put(K key, V value) {
- if (value != null) {
- instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
- } else {
- delete(key);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAsync(K key, V value) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- put(key, value);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- List<Entry<K, V>> toPut = new LinkedList<>();
- List<K> toDelete = new LinkedList<>();
- entries.forEach(e -> {
- if (e.getValue() != null) {
- toPut.add(e);
- } else {
- toDelete.add(e.getKey());
- }
- });
-
- if (!toPut.isEmpty()) {
- instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut));
- }
-
- if (!toDelete.isEmpty()) {
- deleteAll(toDelete);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- putAll(entries);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void delete(K key) {
- instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(K key) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- delete(key);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void deleteAll(List<K> keys) {
- instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
- }
-
- @Override
- public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- deleteAll(keys);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void flush() {
- instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
- }
-
- private interface Func0 {
- void apply();
- }
-
- private void instrument(Counter counter, Timer timer, Func0 func) {
- incCounter(counter);
- long startNs = clock.nanoTime();
- func.apply();
- updateTimer(timer, clock.nanoTime() - startNs);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
deleted file mode 100644
index 29ddb15..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import com.google.common.base.Supplier;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A store backed readable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
-
- protected final KeyValueStore<K, V> kvStore;
-
- /**
- * Constructs an instance of {@link LocalReadableTable}
- * @param tableId the table Id
- * @param kvStore the backing store
- */
- public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
- super(tableId);
- Preconditions.checkNotNull(kvStore, "null KeyValueStore");
- this.kvStore = kvStore;
- }
-
- @Override
- public V get(K key) {
- V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
- if (result == null) {
- incCounter(readMetrics.numMissedLookups);
- }
- return result;
- }
-
- @Override
- public CompletableFuture<V> getAsync(K key) {
- CompletableFuture<V> future = new CompletableFuture();
- try {
- future.complete(get(key));
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public Map<K, V> getAll(List<K> keys) {
- Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
- result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
- return result;
- }
-
- @Override
- public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
- CompletableFuture<Map<K, V>> future = new CompletableFuture();
- try {
- future.complete(getAll(keys));
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void close() {
- // The KV store is not closed here as it may still be needed by downstream operators,
- // it will be closed by the SamzaContainer
- }
-
- private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
- incCounter(counter);
- long startNs = clock.nanoTime();
- T result = func.get();
- updateTimer(timer, clock.nanoTime() - startNs);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
new file mode 100644
index 0000000..d9767b6
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.BaseReadWriteTable;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A store backed readable and writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalTable<K, V> extends BaseReadWriteTable<K, V> {
+
+ protected final KeyValueStore<K, V> kvStore;
+
+ /**
+ * Constructs an instance of {@link LocalTable}
+ * @param tableId the table Id
+ * @param kvStore the backing store
+ */
+ public LocalTable(String tableId, KeyValueStore kvStore) {
+ super(tableId);
+ Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+ this.kvStore = kvStore;
+ }
+
+ @Override
+ public V get(K key) {
+ V result = instrument(metrics.numGets, metrics.getNs, () -> kvStore.get(key));
+ if (result == null) {
+ incCounter(metrics.numMissedLookups);
+ }
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<V> getAsync(K key) {
+ CompletableFuture<V> future = new CompletableFuture();
+ try {
+ future.complete(get(key));
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public Map<K, V> getAll(List<K> keys) {
+ Map<K, V> result = instrument(metrics.numGetAlls, metrics.getAllNs, () -> kvStore.getAll(keys));
+ result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+ CompletableFuture<Map<K, V>> future = new CompletableFuture();
+ try {
+ future.complete(getAll(keys));
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void put(K key, V value) {
+ if (value != null) {
+ instrument(metrics.numPuts, metrics.putNs, () -> kvStore.put(key, value));
+ } else {
+ delete(key);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(K key, V value) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ put(key, value);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ List<Entry<K, V>> toPut = new LinkedList<>();
+ List<K> toDelete = new LinkedList<>();
+ entries.forEach(e -> {
+ if (e.getValue() != null) {
+ toPut.add(e);
+ } else {
+ toDelete.add(e.getKey());
+ }
+ });
+
+ if (!toPut.isEmpty()) {
+ instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
+ }
+
+ if (!toDelete.isEmpty()) {
+ deleteAll(toDelete);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ putAll(entries);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void delete(K key) {
+ instrument(metrics.numDeletes, metrics.deleteNs, () -> kvStore.delete(key));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(K key) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ delete(key);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void deleteAll(List<K> keys) {
+ instrument(metrics.numDeleteAlls, metrics.deleteAllNs, () -> kvStore.deleteAll(keys));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ deleteAll(keys);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void flush() {
+ instrument(metrics.numFlushes, metrics.flushNs, () -> kvStore.flush());
+ }
+
+ @Override
+ public void close() {
+ // The KV store is not closed here as it may still be needed by downstream operators,
+ // it will be closed by the SamzaContainer
+ }
+
+ private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+ incCounter(counter);
+ long startNs = clock.nanoTime();
+ T result = func.get();
+ updateTimer(timer, clock.nanoTime() - startNs);
+ return result;
+ }
+
+ private interface Func0 {
+ void apply();
+ }
+
+ private void instrument(Counter counter, Timer timer, Func0 func) {
+ incCounter(counter);
+ long startNs = clock.nanoTime();
+ func.apply();
+ updateTimer(timer, clock.nanoTime() - startNs);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
index 3be61d0..5099a7e 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
@@ -20,8 +20,7 @@ package org.apache.samza.storage.kv;
import com.google.common.base.Preconditions;
import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.BaseTableProvider;
/**
@@ -53,10 +52,10 @@ public class LocalTableProvider extends BaseTableProvider {
}
@Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId);
- ReadableTable table = new LocalReadWriteTable(tableId, kvStore);
+ ReadWriteTable table = new LocalTable(tableId, kvStore);
table.init(this.context);
return table;
}
[2/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and
ReadWriteTable
Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
deleted file mode 100644
index 044fab4..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalReadWriteTable {
-
- public static final String TABLE_ID = "t1";
-
- private Timer putNs;
- private Timer putAllNs;
- private Timer deleteNs;
- private Timer deleteAllNs;
- private Timer flushNs;
- private Counter numPuts;
- private Counter numPutAlls;
- private Counter numDeletes;
- private Counter numDeleteAlls;
- private Counter numFlushes;
- private Timer putCallbackNs;
- private Timer deleteCallbackNs;
-
- private MetricsRegistry metricsRegistry;
-
- private KeyValueStore kvStore;
-
- @Before
- public void setUp() {
-
- putNs = new Timer("");
- putAllNs = new Timer("");
- deleteNs = new Timer("");
- deleteAllNs = new Timer("");
- flushNs = new Timer("");
- numPuts = new Counter("");
- numPutAlls = new Counter("");
- numDeletes = new Counter("");
- numDeleteAlls = new Counter("");
- numFlushes = new Counter("");
- putCallbackNs = new Timer("");
- deleteCallbackNs = new Timer("");
-
- metricsRegistry = mock(MetricsRegistry.class);
- String groupName = LocalReadWriteTable.class.getSimpleName();
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
-
- kvStore = mock(KeyValueStore.class);
- }
-
- @Test
- public void testPut() throws Exception {
- ReadWriteTable table = createTable(false);
- table.put("k1", "v1");
- table.putAsync("k2", "v2").get();
- table.putAsync("k3", null).get();
- verify(kvStore, times(2)).put(any(), any());
- verify(kvStore, times(1)).delete(any());
- Assert.assertEquals(2, numPuts.getCount());
- Assert.assertEquals(1, numDeletes.getCount());
- Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
- Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeleteAlls.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testPutAll() throws Exception {
- ReadWriteTable table = createTable(false);
- List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
- table.putAll(entries);
- table.putAllAsync(entries).get();
- verify(kvStore, times(2)).putAll(any());
- verify(kvStore, times(2)).deleteAll(any());
- Assert.assertEquals(2, numPutAlls.getCount());
- Assert.assertEquals(2, numDeleteAlls.getCount());
- Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
- Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numDeletes.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testDelete() throws Exception {
- ReadWriteTable table = createTable(false);
- table.delete("");
- table.deleteAsync("").get();
- verify(kvStore, times(2)).delete(any());
- Assert.assertEquals(2, numDeletes.getCount());
- Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeleteAlls.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testDeleteAll() throws Exception {
- ReadWriteTable table = createTable(false);
- table.deleteAll(Collections.emptyList());
- table.deleteAllAsync(Collections.emptyList()).get();
- verify(kvStore, times(2)).deleteAll(any());
- Assert.assertEquals(2, numDeleteAlls.getCount());
- Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeletes.getCount());
- Assert.assertEquals(0, numFlushes.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testFlush() {
- ReadWriteTable table = createTable(false);
- table.flush();
- table.flush();
- verify(kvStore, times(2)).flush();
- Assert.assertEquals(2, numFlushes.getCount());
- Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(0, numDeletes.getCount());
- Assert.assertEquals(0, numDeleteAlls.getCount());
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testTimerDisabled() throws Exception {
- ReadWriteTable table = createTable(true);
- table.put("", "");
- table.putAsync("", "").get();
- table.putAll(Collections.emptyList());
- table.putAllAsync(Collections.emptyList()).get();
- table.delete("");
- table.deleteAsync("").get();
- table.deleteAll(Collections.emptyList());
- table.deleteAllAsync(Collections.emptyList()).get();
- table.flush();
- Assert.assertEquals(1, numFlushes.getCount());
- Assert.assertEquals(2, numPuts.getCount());
- Assert.assertEquals(0, numPutAlls.getCount());
- Assert.assertEquals(2, numDeletes.getCount());
- Assert.assertEquals(2, numDeleteAlls.getCount());
- Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- private LocalReadWriteTable createTable(boolean isTimerDisabled) {
- Map<String, String> config = new HashMap<>();
- if (isTimerDisabled) {
- config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
- }
- Context context = mock(Context.class);
- JobContext jobContext = mock(JobContext.class);
- when(context.getJobContext()).thenReturn(jobContext);
- when(jobContext.getConfig()).thenReturn(new MapConfig(config));
- ContainerContext containerContext = mock(ContainerContext.class);
- when(context.getContainerContext()).thenReturn(containerContext);
- when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
- LocalReadWriteTable table = new LocalReadWriteTable("t1", kvStore);
- table.init(context);
-
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
deleted file mode 100644
index e1c82d9..0000000
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.ContainerContext;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.JobContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadableTable;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-public class TestLocalReadableTable {
-
- public static final String TABLE_ID = "t1";
-
- private List<String> keys;
- private Map<String, String> values;
-
- private Timer getNs;
- private Timer getAllNs;
- private Counter numGets;
- private Counter numGetAlls;
- private Timer getCallbackNs;
- private Counter numMissedLookups;
-
- private MetricsRegistry metricsRegistry;
-
- private KeyValueStore kvStore;
-
- @Before
- public void setUp() {
- keys = Arrays.asList("k1", "k2", "k3");
-
- values = new HashMap<>();
- values.put("k1", "v1");
- values.put("k2", "v2");
- values.put("k3", null);
-
- kvStore = mock(KeyValueStore.class);
- when(kvStore.get("k1")).thenReturn("v1");
- when(kvStore.get("k2")).thenReturn("v2");
- when(kvStore.getAll(keys)).thenReturn(values);
-
- getNs = new Timer("");
- getAllNs = new Timer("");
- numGets = new Counter("");
- numGetAlls = new Counter("");
- getCallbackNs = new Timer("");
- numMissedLookups = new Counter("");
-
- metricsRegistry = mock(MetricsRegistry.class);
- String groupName = LocalReadableTable.class.getSimpleName();
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
- when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
- when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
- }
-
- @Test
- public void testGet() throws Exception {
- ReadableTable table = createTable(false);
- Assert.assertEquals("v1", table.get("k1"));
- Assert.assertEquals("v2", table.getAsync("k2").get());
- Assert.assertNull(table.get("k3"));
- verify(kvStore, times(3)).get(any());
- Assert.assertEquals(3, numGets.getCount());
- Assert.assertEquals(1, numMissedLookups.getCount());
- Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, numGetAlls.getCount());
- Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testGetAll() throws Exception {
- ReadableTable table = createTable(false);
- Assert.assertEquals(values, table.getAll(keys));
- Assert.assertEquals(values, table.getAllAsync(keys).get());
- verify(kvStore, times(2)).getAll(any());
- Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
- Assert.assertEquals(2, numMissedLookups.getCount());
- Assert.assertEquals(3, numGetAlls.getCount());
- Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
- Assert.assertEquals(0, numGets.getCount());
- Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- @Test
- public void testTimerDisabled() throws Exception {
- ReadableTable table = createTable(true);
- table.get("");
- table.getAsync("").get();
- table.getAll(keys);
- table.getAllAsync(keys).get();
- Assert.assertEquals(2, numGets.getCount());
- Assert.assertEquals(4, numMissedLookups.getCount());
- Assert.assertEquals(2, numGetAlls.getCount());
- Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
- Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
- }
-
- private LocalReadableTable createTable(boolean isTimerDisabled) {
- Map<String, String> config = new HashMap<>();
- if (isTimerDisabled) {
- config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
- }
- Context context = mock(Context.class);
- JobContext jobContext = mock(JobContext.class);
- when(context.getJobContext()).thenReturn(jobContext);
- when(jobContext.getConfig()).thenReturn(new MapConfig(config));
- ContainerContext containerContext = mock(ContainerContext.class);
- when(context.getContainerContext()).thenReturn(containerContext);
- when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
-
- LocalReadableTable table = new LocalReadableTable("t1", kvStore);
- table.init(context);
-
- return table;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
index 5367931..263ab56 100644
--- a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -19,7 +19,6 @@
package org.apache.samza.storage.kv;
-import junit.framework.Assert;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
@@ -28,6 +27,7 @@ import org.apache.samza.context.TaskContext;
import org.apache.samza.table.TableProvider;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Test;
+import org.junit.Assert;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
new file mode 100644
index 0000000..0fd4539
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalTableRead {
+
+ public static final String TABLE_ID = "t1";
+
+ private List<String> keys;
+ private Map<String, String> values;
+
+ private Timer getNs;
+ private Timer getAllNs;
+ private Counter numGets;
+ private Counter numGetAlls;
+ private Timer getCallbackNs;
+ private Counter numMissedLookups;
+
+ private MetricsRegistry metricsRegistry;
+
+ private KeyValueStore kvStore;
+
+ @Before
+ public void setUp() {
+ keys = Arrays.asList("k1", "k2", "k3");
+
+ values = new HashMap<>();
+ values.put("k1", "v1");
+ values.put("k2", "v2");
+ values.put("k3", null);
+
+ kvStore = mock(KeyValueStore.class);
+ when(kvStore.get("k1")).thenReturn("v1");
+ when(kvStore.get("k2")).thenReturn("v2");
+ when(kvStore.getAll(keys)).thenReturn(values);
+
+ getNs = new Timer("");
+ getAllNs = new Timer("");
+ numGets = new Counter("");
+ numGetAlls = new Counter("");
+ getCallbackNs = new Timer("");
+ numMissedLookups = new Counter("");
+
+ metricsRegistry = mock(MetricsRegistry.class);
+ String groupName = LocalTable.class.getSimpleName();
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-ns")).thenReturn(getNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-getAll-ns")).thenReturn(getAllNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-get-callback-ns")).thenReturn(getCallbackNs);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ ReadWriteTable table = createTable(false);
+ Assert.assertEquals("v1", table.get("k1"));
+ Assert.assertEquals("v2", table.getAsync("k2").get());
+ Assert.assertNull(table.get("k3"));
+ verify(kvStore, times(3)).get(any());
+ Assert.assertEquals(3, numGets.getCount());
+ Assert.assertEquals(1, numMissedLookups.getCount());
+ Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, numGetAlls.getCount());
+ Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ Assert.assertEquals(values, table.getAll(keys));
+ Assert.assertEquals(values, table.getAllAsync(keys).get());
+ verify(kvStore, times(2)).getAll(any());
+ Assert.assertEquals(Collections.emptyMap(), table.getAll(Collections.emptyList()));
+ Assert.assertEquals(2, numMissedLookups.getCount());
+ Assert.assertEquals(3, numGetAlls.getCount());
+ Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, numGets.getCount());
+ Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testTimerDisabled() throws Exception {
+ ReadWriteTable table = createTable(true);
+ table.get("");
+ table.getAsync("").get();
+ table.getAll(keys);
+ table.getAllAsync(keys).get();
+ Assert.assertEquals(2, numGets.getCount());
+ Assert.assertEquals(4, numMissedLookups.getCount());
+ Assert.assertEquals(2, numGetAlls.getCount());
+ Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ private LocalTable createTable(boolean isTimerDisabled) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ LocalTable table = new LocalTable("t1", kvStore);
+ table.init(context);
+
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
new file mode 100644
index 0000000..80eb99f
--- /dev/null
+++ b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadWriteTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableWrite {
+
+ public static final String TABLE_ID = "t1";
+
+ private Timer putNs;
+ private Timer putAllNs;
+ private Timer deleteNs;
+ private Timer deleteAllNs;
+ private Timer flushNs;
+ private Counter numPuts;
+ private Counter numPutAlls;
+ private Counter numDeletes;
+ private Counter numDeleteAlls;
+ private Counter numFlushes;
+ private Timer putCallbackNs;
+ private Timer deleteCallbackNs;
+
+ private MetricsRegistry metricsRegistry;
+
+ private KeyValueStore kvStore;
+
+ @Before
+ public void setUp() {
+
+ putNs = new Timer("");
+ putAllNs = new Timer("");
+ deleteNs = new Timer("");
+ deleteAllNs = new Timer("");
+ flushNs = new Timer("");
+ numPuts = new Counter("");
+ numPutAlls = new Counter("");
+ numDeletes = new Counter("");
+ numDeleteAlls = new Counter("");
+ numFlushes = new Counter("");
+ putCallbackNs = new Timer("");
+ deleteCallbackNs = new Timer("");
+
+ metricsRegistry = mock(MetricsRegistry.class);
+ String groupName = LocalTable.class.getSimpleName();
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-deleteAll-ns")).thenReturn(deleteAllNs);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-puts")).thenReturn(numPuts);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-putAlls")).thenReturn(numPutAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deletes")).thenReturn(numDeletes);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-deleteAlls")).thenReturn(numDeleteAlls);
+ when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-flushes")).thenReturn(numFlushes);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-callback-ns")).thenReturn(putCallbackNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-callback-ns")).thenReturn(deleteCallbackNs);
+ when(metricsRegistry.newTimer(groupName, TABLE_ID + "-flush-ns")).thenReturn(flushNs);
+
+ kvStore = mock(KeyValueStore.class);
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.put("k1", "v1");
+ table.putAsync("k2", "v2").get();
+ table.putAsync("k3", null).get();
+ verify(kvStore, times(2)).put(any(), any());
+ verify(kvStore, times(1)).delete(any());
+ Assert.assertEquals(2, numPuts.getCount());
+ Assert.assertEquals(1, numDeletes.getCount());
+ Assert.assertTrue(putNs.getSnapshot().getAverage() > 0);
+ Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testPutAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ List<Entry> entries = Arrays.asList(new Entry("k1", "v1"), new Entry("k2", null));
+ table.putAll(entries);
+ table.putAllAsync(entries).get();
+ verify(kvStore, times(2)).putAll(any());
+ verify(kvStore, times(2)).deleteAll(any());
+ Assert.assertEquals(2, numPutAlls.getCount());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertTrue(putAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.delete("");
+ table.deleteAsync("").get();
+ verify(kvStore, times(2)).delete(any());
+ Assert.assertEquals(2, numDeletes.getCount());
+ Assert.assertTrue(deleteNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testDeleteAll() throws Exception {
+ ReadWriteTable table = createTable(false);
+ table.deleteAll(Collections.emptyList());
+ table.deleteAllAsync(Collections.emptyList()).get();
+ verify(kvStore, times(2)).deleteAll(any());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertTrue(deleteAllNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numFlushes.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testFlush() {
+ ReadWriteTable table = createTable(false);
+ table.flush();
+ table.flush();
+ verify(kvStore, times(2)).flush();
+ Assert.assertEquals(2, numFlushes.getCount());
+ Assert.assertTrue(flushNs.getSnapshot().getAverage() > 0);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(0, numDeletes.getCount());
+ Assert.assertEquals(0, numDeleteAlls.getCount());
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ @Test
+ public void testTimerDisabled() throws Exception {
+ ReadWriteTable table = createTable(true);
+ table.put("", "");
+ table.putAsync("", "").get();
+ table.putAll(Collections.emptyList());
+ table.putAllAsync(Collections.emptyList()).get();
+ table.delete("");
+ table.deleteAsync("").get();
+ table.deleteAll(Collections.emptyList());
+ table.deleteAllAsync(Collections.emptyList()).get();
+ table.flush();
+ Assert.assertEquals(1, numFlushes.getCount());
+ Assert.assertEquals(2, numPuts.getCount());
+ Assert.assertEquals(0, numPutAlls.getCount());
+ Assert.assertEquals(2, numDeletes.getCount());
+ Assert.assertEquals(2, numDeleteAlls.getCount());
+ Assert.assertEquals(0, flushNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteAllNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, putCallbackNs.getSnapshot().getAverage(), 0.001);
+ Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
+ }
+
+ private LocalTable createTable(boolean isTimerDisabled) {
+ Map<String, String> config = new HashMap<>();
+ if (isTimerDisabled) {
+ config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+ }
+ Context context = mock(Context.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(context.getJobContext()).thenReturn(jobContext);
+ when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+ ContainerContext containerContext = mock(ContainerContext.class);
+ when(context.getContainerContext()).thenReturn(containerContext);
+ when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+ LocalTable table = new LocalTable("t1", kvStore);
+ table.init(context);
+
+ return table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2137a46..ab650f2 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -229,7 +229,7 @@ public class StreamTaskIntegrationTest {
@Override
public void init(Context context) throws Exception {
- profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+ profileViewTable = context.getTaskContext().getTable("profile-view-store");
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
deleted file mode 100644
index b447493..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.TaskApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.context.Context;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.StreamTaskFactory;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
-import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * This test class tests sendTo() and join() for local tables
- */
-public class TestLocalTable extends AbstractIntegrationTestHarness {
-
- @Test
- public void testSendTo() throws Exception {
-
- int count = 10;
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
- MyMapFunction mapFn = new MyMapFunction();
-
- final StreamApplication app = appDesc -> {
-
- Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
- KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
- appDesc.getInputStream(isd)
- .map(mapFn)
- .sendTo(table);
- };
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- for (int i = 0; i < partitionCount; i++) {
- MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
- assertEquals(count, mapFnCopy.received.size());
- mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
- }
- }
-
- static class StreamTableJoinApp implements StreamApplication {
- static List<PageView> received = new LinkedList<>();
- static List<EnrichedPageView> joined = new LinkedList<>();
-
- @Override
- public void describe(StreamApplicationDescriptor appDesc) {
- Table<KV<Integer, Profile>> table = appDesc.getTable(
- new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
- appDesc.getInputStream(profileISD)
- .map(m -> new KV(m.getMemberId(), m))
- .sendTo(table);
-
- GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDesc.getInputStream(pageViewISD)
- .map(pv -> {
- received.add(pv);
- return pv;
- })
- .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
- .join(table, new PageViewToProfileJoinFunction())
- .sink((m, collector, coordinator) -> joined.add(m));
- }
- }
-
- @Test
- public void testStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.Profile.samza.system", "test");
- configs.put("streams.Profile.samza.bootstrap", "true");
- configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
- assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
- assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
- }
-
- static class DualStreamTableJoinApp implements StreamApplication {
- static List<Profile> sentToProfileTable1 = new LinkedList<>();
- static List<Profile> sentToProfileTable2 = new LinkedList<>();
- static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
- static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
-
- @Override
- public void describe(StreamApplicationDescriptor appDesc) {
- KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
- KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
-
- PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
- PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
-
- Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
-
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
- GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
- MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
- MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
-
- profileStream1
- .map(m -> {
- sentToProfileTable1.add(m);
- return new KV(m.getMemberId(), m);
- })
- .sendTo(profileTable);
- profileStream2
- .map(m -> {
- sentToProfileTable2.add(m);
- return new KV(m.getMemberId(), m);
- })
- .sendTo(profileTable);
-
- GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
- GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
- MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
- MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
-
- pageViewStream1
- .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
- .join(profileTable, joinFn1)
- .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
-
- pageViewStream2
- .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
- .join(profileTable, joinFn2)
- .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
- }
- }
-
- @Test
- public void testDualStreamTableJoin() throws Exception {
-
- int count = 10;
- PageView[] pageViews = TestTableData.generatePageViews(count);
- Profile[] profiles = TestTableData.generateProfiles(count);
-
- int partitionCount = 4;
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.Profile1.samza.system", "test");
- configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile1.samza.bootstrap", "true");
- configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.Profile2.samza.system", "test");
- configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
- configs.put("streams.Profile2.samza.bootstrap", "true");
- configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.PageView1.samza.system", "test");
- configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
- configs.put("streams.PageView2.samza.system", "test");
- configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
- assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
- assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
- assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
- assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
- assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
- }
-
- static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
- Map<String, String> configs = new HashMap<>();
- configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
-
- configs.put(JobConfig.JOB_NAME(), "test-table-job");
- configs.put(JobConfig.PROCESSOR_ID(), "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
- configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
-
- // For intermediate streams
- configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
- configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
- configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
- configs.put("systems.kafka.samza.key.serde", "int");
- configs.put("systems.kafka.samza.msg.serde", "json");
- configs.put("systems.kafka.default.stream.replication.factor", "1");
- configs.put("job.default.system", "kafka");
-
- configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
- configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
-
- return configs;
- }
-
- private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
- private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
-
- private transient List<Profile> received;
- private transient ReadableTable table;
-
- @Override
- public void init(Context context) {
- table = (ReadableTable) context.getTaskContext().getTable("t1");
- this.received = new ArrayList<>();
-
- taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
- }
-
- @Override
- public KV<Integer, Profile> apply(Profile profile) {
- received.add(profile);
- return new KV(profile.getMemberId(), profile);
- }
-
- public static MyMapFunction getMapFunctionByTask(String taskName) {
- return taskToMapFunctionMap.get(taskName);
- }
- }
-
- @Test
- public void testWithLowLevelApi() throws Exception {
-
- Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
- configs.put("streams.PageView.partitionCount", String.valueOf(4));
- configs.put("task.inputs", "test.PageView");
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
- executeRun(runner, config);
- runner.waitForFinish();
- }
-
- static public class MyTaskApplication implements TaskApplication {
- @Override
- public void describe(TaskApplicationDescriptor appDescriptor) {
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDescriptor
- .withInputStream(pageViewISD)
- .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
- .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
- }
- }
-
- static public class MyStreamTask implements StreamTask, InitableTask {
- private ReadWriteTable<Integer, PageView> pageViewTable;
- @Override
- public void init(Context context) throws Exception {
- pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1");
- }
- @Override
- public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
- PageView pv = (PageView) message.getMessage();
- pageViewTable.put(pv.getMemberId(), pv);
- PageView pv2 = pageViewTable.get(pv.getMemberId());
- Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
- Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
new file mode 100644
index 0000000..0303c26
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.context.Context;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import static org.apache.samza.test.table.TestTableData.PageView;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import static org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import static org.apache.samza.test.table.TestTableData.Profile;
+import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * This test class tests sendTo() and join() for local tables
+ */
+public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness {
+
+ @Test
+ public void testSendTo() throws Exception {
+
+ int count = 10;
+ Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.Profile.samza.system", "test");
+ configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+ MyMapFunction mapFn = new MyMapFunction();
+
+ final StreamApplication app = appDesc -> {
+
+ Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
+ KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+
+ appDesc.getInputStream(isd)
+ .map(mapFn)
+ .sendTo(table);
+ };
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ for (int i = 0; i < partitionCount; i++) {
+ MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
+ assertEquals(count, mapFnCopy.received.size());
+ mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+ }
+ }
+
+ static class StreamTableJoinApp implements StreamApplication {
+ static List<PageView> received = new LinkedList<>();
+ static List<EnrichedPageView> joined = new LinkedList<>();
+
+ @Override
+ public void describe(StreamApplicationDescriptor appDesc) {
+ Table<KV<Integer, Profile>> table = appDesc.getTable(
+ new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+ appDesc.getInputStream(profileISD)
+ .map(m -> new KV(m.getMemberId(), m))
+ .sendTo(table);
+
+ GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDesc.getInputStream(pageViewISD)
+ .map(pv -> {
+ received.add(pv);
+ return pv;
+ })
+ .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+ .join(table, new PageViewToProfileJoinFunction())
+ .sink((m, collector, coordinator) -> joined.add(m));
+ }
+ }
+
+ @Test
+ public void testStreamTableJoin() throws Exception {
+
+ int count = 10;
+ PageView[] pageViews = TestTableData.generatePageViews(count);
+ Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.Profile.samza.system", "test");
+ configs.put("streams.Profile.samza.bootstrap", "true");
+ configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
+ assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
+ assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+ }
+
+ static class DualStreamTableJoinApp implements StreamApplication {
+ static List<Profile> sentToProfileTable1 = new LinkedList<>();
+ static List<Profile> sentToProfileTable2 = new LinkedList<>();
+ static List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+ static List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+
+ @Override
+ public void describe(StreamApplicationDescriptor appDesc) {
+ KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
+ KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
+
+ PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
+ PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
+
+ Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
+ GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+ MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
+ MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
+
+ profileStream1
+ .map(m -> {
+ sentToProfileTable1.add(m);
+ return new KV(m.getMemberId(), m);
+ })
+ .sendTo(profileTable);
+ profileStream2
+ .map(m -> {
+ sentToProfileTable2.add(m);
+ return new KV(m.getMemberId(), m);
+ })
+ .sendTo(profileTable);
+
+ GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
+ GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+ MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
+ MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
+
+ pageViewStream1
+ .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+ .join(profileTable, joinFn1)
+ .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+ pageViewStream2
+ .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+ .join(profileTable, joinFn2)
+ .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+ }
+ }
+
+ @Test
+ public void testDualStreamTableJoin() throws Exception {
+
+ int count = 10;
+ PageView[] pageViews = TestTableData.generatePageViews(count);
+ Profile[] profiles = TestTableData.generateProfiles(count);
+
+ int partitionCount = 4;
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.Profile1.samza.system", "test");
+ configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile1.samza.bootstrap", "true");
+ configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.Profile2.samza.system", "test");
+ configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
+ configs.put("streams.Profile2.samza.bootstrap", "true");
+ configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.PageView1.samza.system", "test");
+ configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
+
+ configs.put("streams.PageView2.samza.system", "test");
+ configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
+
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
+ assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
+ assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
+ assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+ }
+
+ static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
+
+ configs.put(JobConfig.JOB_NAME(), "test-table-job");
+ configs.put(JobConfig.PROCESSOR_ID(), "1");
+ configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+ // For intermediate streams
+ configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+ configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl);
+ configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect);
+ configs.put("systems.kafka.samza.key.serde", "int");
+ configs.put("systems.kafka.samza.msg.serde", "json");
+ configs.put("systems.kafka.default.stream.replication.factor", "1");
+ configs.put("job.default.system", "kafka");
+
+ configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
+ configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
+
+ return configs;
+ }
+
+ private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
+
+ private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+
+ private transient List<Profile> received;
+ private transient ReadWriteTable table;
+
+ @Override
+ public void init(Context context) {
+ table = context.getTaskContext().getTable("t1");
+ this.received = new ArrayList<>();
+
+ taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+ }
+
+ @Override
+ public KV<Integer, Profile> apply(Profile profile) {
+ received.add(profile);
+ return new KV(profile.getMemberId(), profile);
+ }
+
+ public static MyMapFunction getMapFunctionByTask(String taskName) {
+ return taskToMapFunctionMap.get(taskName);
+ }
+ }
+
+ @Test
+ public void testWithLowLevelApi() throws Exception {
+
+ Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(TestTableData.generatePageViews(10)));
+ configs.put("streams.PageView.partitionCount", String.valueOf(4));
+ configs.put("task.inputs", "test.PageView");
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(new MyTaskApplication(), config);
+ executeRun(runner, config);
+ runner.waitForFinish();
+ }
+
+ static public class MyTaskApplication implements TaskApplication {
+ @Override
+ public void describe(TaskApplicationDescriptor appDescriptor) {
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDescriptor
+ .withInputStream(pageViewISD)
+ .withTable(new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new PageViewJsonSerde())))
+ .withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
+ }
+ }
+
+ static public class MyStreamTask implements StreamTask, InitableTask {
+ private ReadWriteTable<Integer, PageView> pageViewTable;
+ @Override
+ public void init(Context context) throws Exception {
+ pageViewTable = context.getTaskContext().getTable("t1");
+ }
+ @Override
+ public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+ PageView pv = (PageView) message.getMessage();
+ pageViewTable.put(pv.getMemberId(), pv);
+ PageView pv2 = pageViewTable.get(pv.getMemberId());
+ Assert.assertEquals(pv.getMemberId(), pv2.getMemberId());
+ Assert.assertEquals(pv.getPageKey(), pv2.getPageKey());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
deleted file mode 100644
index 3de8300..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import com.google.common.cache.CacheBuilder;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
-import org.apache.samza.table.remote.RemoteReadableTable;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.TableRateLimiter;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
-import org.apache.samza.test.util.Base64Serializer;
-import org.apache.samza.util.RateLimiter;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
-import static org.apache.samza.test.table.TestTableData.PageView;
-import static org.apache.samza.test.table.TestTableData.Profile;
-import static org.apache.samza.test.table.TestTableData.generatePageViews;
-import static org.apache.samza.test.table.TestTableData.generateProfiles;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.withSettings;
-
-
-public class TestRemoteTable extends AbstractIntegrationTestHarness {
-
- static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
-
- static class InMemoryReadFunction implements TableReadFunction<Integer, Profile> {
- private final String serializedProfiles;
- private transient Map<Integer, Profile> profileMap;
-
- private InMemoryReadFunction(String profiles) {
- this.serializedProfiles = profiles;
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
- this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
- }
-
- @Override
- public CompletableFuture<Profile> getAsync(Integer key) {
- return CompletableFuture.completedFuture(profileMap.get(key));
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
-
- static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) {
- return new InMemoryReadFunction(serializedProfiles);
- }
- }
-
- static class InMemoryWriteFunction implements TableWriteFunction<Integer, EnrichedPageView> {
- private transient List<EnrichedPageView> records;
- private String testName;
-
- public InMemoryWriteFunction(String testName) {
- this.testName = testName;
- }
-
- // Verify serializable functionality
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- // Write to the global list for verification
- records = writtenRecords.get(testName);
- }
-
- @Override
- public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
- records.add(record);
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(Integer key) {
- records.remove(key);
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- private <K, V> Table<KV<K, V>> getCachingTable(TableDescriptor<K, V, ?> actualTableDesc, boolean defaultCache, String id, StreamApplicationDescriptor appDesc) {
- CachingTableDescriptor<K, V> cachingDesc;
- if (defaultCache) {
- cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc);
- cachingDesc.withReadTtl(Duration.ofMinutes(5));
- cachingDesc.withWriteTtl(Duration.ofMinutes(5));
- } else {
- GuavaCacheTableDescriptor<K, V> guavaTableDesc = new GuavaCacheTableDescriptor<>("guava-table-" + id);
- guavaTableDesc.withCache(CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build());
- cachingDesc = new CachingTableDescriptor<>("caching-table-" + id, actualTableDesc, guavaTableDesc);
- }
-
- return appDesc.getTable(cachingDesc);
- }
-
- static class MyReadFunction implements TableReadFunction {
- @Override
- public CompletableFuture getAsync(Object key) {
- return null;
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception {
- final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
-
- writtenRecords.put(testName, new ArrayList<>());
-
- int count = 10;
- PageView[] pageViews = generatePageViews(count);
- String profiles = Base64Serializer.serialize(generateProfiles(count));
-
- int partitionCount = 4;
- Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
-
- configs.put("streams.PageView.samza.system", "test");
- configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
- configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
- final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
- final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
- final StreamApplication app = appDesc -> {
- RemoteTableDescriptor<Integer, TestTableData.Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
- inputTableDesc
- .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles))
- .withRateLimiter(readRateLimiter, null, null);
-
- // dummy reader
- TableReadFunction readFn = new MyReadFunction();
-
- RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
- outputTableDesc
- .withReadFunction(readFn)
- .withWriteFunction(writer)
- .withRateLimiter(writeRateLimiter, null, null);
-
- Table<KV<Integer, EnrichedPageView>> outputTable = withCache
- ? getCachingTable(outputTableDesc, defaultCache, "output", appDesc)
- : appDesc.getTable(outputTableDesc);
-
- Table<KV<Integer, Profile>> inputTable = withCache
- ? getCachingTable(inputTableDesc, defaultCache, "input", appDesc)
- : appDesc.getTable(inputTableDesc);
-
- DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
- GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
- appDesc.getInputStream(isd)
- .map(pv -> new KV<>(pv.getMemberId(), pv))
- .join(inputTable, new PageViewToProfileJoinFunction())
- .map(m -> new KV(m.getMemberId(), m))
- .sendTo(outputTable);
- };
-
- Config config = new MapConfig(configs);
- final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
- executeRun(runner, config);
- runner.waitForFinish();
-
- int numExpected = count * partitionCount;
- Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
- Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
- }
-
- @Test
- public void testStreamTableJoinRemoteTable() throws Exception {
- doTestStreamTableJoinRemoteTable(false, false, "testStreamTableJoinRemoteTable");
- }
-
- @Test
- public void testStreamTableJoinRemoteTableWithCache() throws Exception {
- doTestStreamTableJoinRemoteTable(true, false, "testStreamTableJoinRemoteTableWithCache");
- }
-
- @Test
- public void testStreamTableJoinRemoteTableWithDefaultCache() throws Exception {
- doTestStreamTableJoinRemoteTable(true, true, "testStreamTableJoinRemoteTableWithDefaultCache");
- }
-
- private Context createMockContext() {
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
- doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), anyString());
- doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), anyString());
- Context context = new MockContext();
- doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
- doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
- return context;
- }
-
- @Test(expected = SamzaException.class)
- public void testCatchReaderException() {
- TableReadFunction<String, ?> reader = mock(TableReadFunction.class);
- CompletableFuture<String> future = new CompletableFuture<>();
- future.completeExceptionally(new RuntimeException("Expected test exception"));
- doReturn(future).when(reader).getAsync(anyString());
- TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
- RemoteReadableTable<String, ?> table = new RemoteReadableTable<>(
- "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
- table.init(createMockContext());
- table.get("abc");
- }
-
- @Test(expected = SamzaException.class)
- public void testCatchWriterException() {
- TableReadFunction<String, String> reader = mock(TableReadFunction.class);
- TableWriteFunction<String, String> writer = mock(TableWriteFunction.class);
- CompletableFuture<String> future = new CompletableFuture<>();
- future.completeExceptionally(new RuntimeException("Expected test exception"));
- doReturn(future).when(writer).putAsync(anyString(), any());
- TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
- RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>(
- "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
- table.init(createMockContext());
- table.put("abc", "efg");
- }
-}