You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/12/17 23:11:36 UTC
[3/4] samza git commit: SAMZA-2043: Consolidate ReadableTable and
ReadWriteTable
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index 93a0521..8b6bc1a 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -21,8 +21,7 @@ package org.apache.samza.table.remote;
import com.google.common.base.Preconditions;
import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -45,9 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class RemoteTableProvider extends BaseTableProvider {
- private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
-
- private boolean readOnly;
+ private final List<RemoteTable<?, ?>> tables = new ArrayList<>();
/**
* Map of tableId -> executor service for async table IO and callbacks. The same executors
@@ -63,21 +60,13 @@ public class RemoteTableProvider extends BaseTableProvider {
}
@Override
- public void init(Context context) {
- super.init(context);
- JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
- this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
- }
-
- @Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
- RemoteReadableTable table;
-
JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
+ // Read part
TableReadFunction readFn = getReadFn(tableConfig);
RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
if (rateLimiter != null) {
@@ -86,34 +75,29 @@ public class RemoteTableProvider extends BaseTableProvider {
TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
- TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
- TableRateLimiter writeRateLimiter = null;
-
TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
- TableRetryPolicy writeRetryPolicy = null;
-
- if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
- retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
- Thread thread = new Thread(runnable);
- thread.setName("table-retry-executor");
- thread.setDaemon(true);
- return thread;
- });
- }
-
if (readRetryPolicy != null) {
+ if (retryExecutor == null) {
+ retryExecutor = createRetryExecutor();
+ }
readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
}
- TableWriteFunction writeFn = getWriteFn(tableConfig);
-
boolean isRateLimited = readRateLimiter.isRateLimited();
- if (!readOnly) {
- writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+
+ // Write part
+ TableWriteFunction writeFn = getWriteFn(tableConfig);
+ TableRateLimiter writeRateLimiter = null;
+ TableRetryPolicy writeRetryPolicy = null;
+ if (writeFn != null) {
+ TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
isRateLimited |= writeRateLimiter.isRateLimited();
writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
if (writeRetryPolicy != null) {
+ if (retryExecutor == null) {
+ retryExecutor = createRetryExecutor();
+ }
writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
}
}
@@ -140,13 +124,8 @@ public class RemoteTableProvider extends BaseTableProvider {
}));
}
- if (readOnly) {
- table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
- tableExecutors.get(tableId), callbackExecutors.get(tableId));
- } else {
- table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
- writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
- }
+ RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter,
+ writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
if (readRetryPolicy != null) {
@@ -192,5 +171,14 @@ public class RemoteTableProvider extends BaseTableProvider {
}
return writeFn;
}
+
+ private ScheduledExecutorService createRetryExecutor() {
+ return Executors.newSingleThreadScheduledExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-retry-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
new file mode 100644
index 0000000..df6833e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table.utils;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableMetrics {
+
+ // Read metrics
+ public final Timer getNs;
+ public final Timer getAllNs;
+ public final Counter numGets;
+ public final Counter numGetAlls;
+ public final Counter numMissedLookups;
+ // Write metrics
+ public final Counter numPuts;
+ public final Timer putNs;
+ public final Counter numPutAlls;
+ public final Timer putAllNs;
+ public final Counter numDeletes;
+ public final Timer deleteNs;
+ public final Counter numDeleteAlls;
+ public final Timer deleteAllNs;
+ public final Counter numFlushes;
+ public final Timer flushNs;
+
+ /**
+ * Constructor based on container and task container context
+ *
+ * @param context {@link Context} for this task
+ * @param table underlying table
+ * @param tableId table Id
+ */
+ public TableMetrics(Context context, Table table, String tableId) {
+ TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+ // Read metrics
+ numGets = tableMetricsUtil.newCounter("num-gets");
+ getNs = tableMetricsUtil.newTimer("get-ns");
+ numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+ getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+ numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+ // Write metrics
+ numPuts = tableMetricsUtil.newCounter("num-puts");
+ putNs = tableMetricsUtil.newTimer("put-ns");
+ numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
+ putAllNs = tableMetricsUtil.newTimer("putAll-ns");
+ numDeletes = tableMetricsUtil.newCounter("num-deletes");
+ deleteNs = tableMetricsUtil.newTimer("delete-ns");
+ numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
+ deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
+ numFlushes = tableMetricsUtil.newCounter("num-flushes");
+ flushNs = tableMetricsUtil.newTimer("flush-ns");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
deleted file mode 100644
index e77fcfd..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class TableReadMetrics {
-
- public final Timer getNs;
- public final Timer getAllNs;
- public final Counter numGets;
- public final Counter numGetAlls;
- public final Counter numMissedLookups;
-
- /**
- * Constructor based on container and task container context
- *
- * @param context {@link Context} for this task
- * @param table underlying table
- * @param tableId table Id
- */
- public TableReadMetrics(Context context, Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
- numGets = tableMetricsUtil.newCounter("num-gets");
- getNs = tableMetricsUtil.newTimer("get-ns");
- numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
- getAllNs = tableMetricsUtil.newTimer("getAll-ns");
- numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
deleted file mode 100644
index bf65b74..0000000
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-public class TableWriteMetrics {
-
- public final Counter numPuts;
- public final Timer putNs;
- public final Counter numPutAlls;
- public final Timer putAllNs;
- public final Counter numDeletes;
- public final Timer deleteNs;
- public final Counter numDeleteAlls;
- public final Timer deleteAllNs;
- public final Counter numFlushes;
- public final Timer flushNs;
-
- /**
- * Utility class that contains the default set of write metrics.
- *
- * @param context {@link Context} for this task
- * @param table underlying table
- * @param tableId table Id
- */
- public TableWriteMetrics(Context context, Table table, String tableId) {
- TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
- numPuts = tableMetricsUtil.newCounter("num-puts");
- putNs = tableMetricsUtil.newTimer("put-ns");
- numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
- putAllNs = tableMetricsUtil.newTimer("putAll-ns");
- numDeletes = tableMetricsUtil.newCounter("num-deletes");
- deleteNs = tableMetricsUtil.newTimer("delete-ns");
- numDeleteAlls = tableMetricsUtil.newCounter("num-deleteAlls");
- deleteAllNs = tableMetricsUtil.newTimer("deleteAll-ns");
- numFlushes = tableMetricsUtil.newCounter("num-flushes");
- flushNs = tableMetricsUtil.newTimer("flush-ns");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
index 4112c8b..8fd161b 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
@@ -26,7 +26,7 @@ import org.apache.samza.operators.KV;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
@@ -71,7 +71,7 @@ public class TestStreamTableJoinOperatorImpl {
return record.getKey();
}
});
- ReadableTable table = mock(ReadableTable.class);
+ ReadWriteTable table = mock(ReadWriteTable.class);
when(table.get("1")).thenReturn("r1");
when(table.get("2")).thenReturn(null);
Context context = new MockContext();
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
index a3b1963..e60b6ff 100644
--- a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
+++ b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
@@ -49,12 +49,12 @@ public class TestTableManager {
public static class DummyTableProviderFactory implements TableProviderFactory {
- static ReadableTable table;
+ static ReadWriteTable table;
static TableProvider tableProvider;
@Override
public TableProvider getTableProvider(String tableId) {
- table = mock(ReadableTable.class);
+ table = mock(ReadWriteTable.class);
tableProvider = mock(TableProvider.class);
when(tableProvider.getTable()).thenReturn(table);
return tableProvider;
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 5a19767..c304bfd 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -35,11 +35,10 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
import org.apache.samza.table.caching.guava.GuavaCacheTable;
import org.apache.samza.table.descriptors.CachingTableDescriptor;
import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
@@ -138,11 +137,11 @@ public class TestCachingTable {
return Pair.of(cacheTable, cacheStore);
}
- private void initTables(ReadableTable ... tables) {
+ private void initTables(ReadWriteTable ... tables) {
initTables(false, tables);
}
- private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+ private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) {
Map<String, String> config = new HashMap<>();
if (isTimerMetricsDisabled) {
config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -242,7 +241,7 @@ public class TestCachingTable {
@Test
public void testNonexistentKeyInTable() {
- ReadableTable<String, String> table = mock(ReadableTable.class);
+ ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
doReturn(CompletableFuture.completedFuture(null)).when(table).getAsync(any());
ReadWriteTable<String, String> cache = getMockCache().getLeft();
CachingTable<String, String> cachingTable = new CachingTable<>("myTable", table, cache, false);
@@ -255,7 +254,7 @@ public class TestCachingTable {
@Test
public void testKeyEviction() {
- ReadableTable<String, String> table = mock(ReadableTable.class);
+ ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
doReturn(CompletableFuture.completedFuture("3")).when(table).getAsync(any());
ReadWriteTable<String, String> cache = mock(ReadWriteTable.class);
@@ -283,7 +282,7 @@ public class TestCachingTable {
TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+ final RemoteTable<String, String> remoteTable = new RemoteTable<>(
tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
@@ -402,7 +401,7 @@ public class TestCachingTable {
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
- final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+ final RemoteTable<String, String> remoteTable = new RemoteTable<>(
tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
index 486295a..a764a8b 100644
--- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
@@ -30,7 +30,7 @@ import org.apache.samza.context.Context;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
import org.junit.Test;
@@ -143,7 +143,7 @@ public class TestLocalTableDescriptor {
}
@Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
throw new SamzaException("Not implemented");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
deleted file mode 100644
index d7733a8..0000000
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.MockContext;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.retry.RetriableReadFunction;
-import org.apache.samza.table.retry.RetriableWriteFunction;
-import org.apache.samza.table.retry.TableRetryPolicy;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestRemoteReadWriteTable {
- private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
-
- public static Context getMockContext() {
- Context context = new MockContext();
- MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
- doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
- doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
- doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
- doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
- return context;
- }
-
- private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
- TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
- return getTable(tableId, readFn, writeFn, null);
- }
-
- private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
- TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
- RemoteReadableTable<K, V> table;
-
- TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
- TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
- doReturn(true).when(readRateLimiter).isRateLimited();
- doReturn(true).when(writeRateLimiter).isRateLimited();
-
- ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
-
- if (writeFn == null) {
- table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
- } else {
- table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
- }
-
- Context context = getMockContext();
-
- table.init(context);
-
- return (T) table;
- }
-
- private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
- String tableId = "testGet-" + sync + error + retry;
- TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
- // Sync is backed by async so needs to mock the async method
- CompletableFuture<String> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- if (!retry) {
- doReturn(future).when(readFn).getAsync(anyString());
- } else {
- final int [] times = new int[] {0};
- doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
- .when(readFn).getAsync(anyString());
- }
- } else {
- future = CompletableFuture.completedFuture("bar");
- doReturn(future).when(readFn).getAsync(anyString());
- }
- if (retry) {
- doReturn(true).when(readFn).isRetriable(any());
- TableRetryPolicy policy = new TableRetryPolicy();
- readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
- }
- RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
- Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
- verify(table.readRateLimiter, times(1)).throttle(anyString());
- }
-
- @Test
- public void testGet() throws Exception {
- doTestGet(true, false, false);
- }
-
- @Test
- public void testGetAsync() throws Exception {
- doTestGet(false, false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testGetAsyncError() throws Exception {
- doTestGet(false, true, false);
- }
-
- @Test
- public void testGetAsyncErrorRetried() throws Exception {
- doTestGet(false, true, true);
- }
-
- @Test
- public void testGetMultipleTables() {
- TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
- TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
-
- // Sync is backed by async so needs to mock the async method
- doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
- doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
-
- RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
- RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
-
- CompletableFuture<String> future1 = table1.getAsync("foo1");
- CompletableFuture<String> future2 = table2.getAsync("foo2");
-
- CompletableFuture.allOf(future1, future2)
- .thenAccept(u -> {
- Assert.assertEquals(future1.join(), "bar1");
- Assert.assertEquals(future2.join(), "bar1");
- });
- }
-
- private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
- String tableId = "testPut-" + sync + error + isDelete + retry;
- TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
- TableWriteFunction<String, String> writeFn = mockWriteFn;
- CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
- CompletableFuture<Void> failureFuture = new CompletableFuture();
- failureFuture.completeExceptionally(new RuntimeException("Test exception"));
- if (!error) {
- if (isDelete) {
- doReturn(successFuture).when(writeFn).deleteAsync(any());
- } else {
- doReturn(successFuture).when(writeFn).putAsync(any(), any());
- }
- } else if (!retry) {
- if (isDelete) {
- doReturn(failureFuture).when(writeFn).deleteAsync(any());
- } else {
- doReturn(failureFuture).when(writeFn).putAsync(any(), any());
- }
- } else {
- doReturn(true).when(writeFn).isRetriable(any());
- final int [] times = new int[] {0};
- if (isDelete) {
- doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
- } else {
- doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
- }
- writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
- }
- RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
- if (sync) {
- table.put("foo", isDelete ? null : "bar");
- } else {
- table.putAsync("foo", isDelete ? null : "bar").get();
- }
- ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
- ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
- if (isDelete) {
- verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
- } else {
- verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
- Assert.assertEquals("bar", valCaptor.getValue());
- }
- Assert.assertEquals("foo", keyCaptor.getValue());
- if (isDelete) {
- verify(table.writeRateLimiter, times(1)).throttle(anyString());
- } else {
- verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
- }
- }
-
- @Test
- public void testPut() throws Exception {
- doTestPut(true, false, false, false);
- }
-
- @Test
- public void testPutDelete() throws Exception {
- doTestPut(true, false, true, false);
- }
-
- @Test
- public void testPutAsync() throws Exception {
- doTestPut(false, false, false, false);
- }
-
- @Test
- public void testPutAsyncDelete() throws Exception {
- doTestPut(false, false, true, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testPutAsyncError() throws Exception {
- doTestPut(false, true, false, false);
- }
-
- @Test
- public void testPutAsyncErrorRetried() throws Exception {
- doTestPut(false, true, false, true);
- }
-
- private void doTestDelete(boolean sync, boolean error) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(writeFn).deleteAsync(any());
- ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
- if (sync) {
- table.delete("foo");
- } else {
- table.deleteAsync("foo").get();
- }
- verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
- Assert.assertEquals("foo", argCaptor.getValue());
- verify(table.writeRateLimiter, times(1)).throttle(anyString());
- }
-
- @Test
- public void testDelete() throws Exception {
- doTestDelete(true, false);
- }
-
- @Test
- public void testDeleteAsync() throws Exception {
- doTestDelete(false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testDeleteAsyncError() throws Exception {
- doTestDelete(false, true);
- }
-
- private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
- TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
- Map<String, String> res = new HashMap<>();
- res.put("foo1", "bar1");
- if (!partial) {
- res.put("foo2", "bar2");
- }
- CompletableFuture<Map<String, String>> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(res);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(readFn).getAllAsync(any());
- RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
- Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
- : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
- verify(table.readRateLimiter, times(1)).throttle(anyCollection());
- }
-
- @Test
- public void testGetAll() throws Exception {
- doTestGetAll(true, false, false);
- }
-
- @Test
- public void testGetAllAsync() throws Exception {
- doTestGetAll(false, false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testGetAllAsyncError() throws Exception {
- doTestGetAll(false, true, false);
- }
-
- // Partial result is an acceptable scenario
- @Test
- public void testGetAllPartialResult() throws Exception {
- doTestGetAll(false, false, true);
- }
-
- public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(writeFn).putAllAsync(any());
- if (hasDelete) {
- doReturn(future).when(writeFn).deleteAllAsync(any());
- }
- List<Entry<String, String>> entries = Arrays.asList(
- new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
- ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
- if (sync) {
- table.putAll(entries);
- } else {
- table.putAllAsync(entries).get();
- }
- verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
- if (hasDelete) {
- ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
- verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
- Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
- Assert.assertEquals(1, argCaptor.getValue().size());
- Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
- verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
- } else {
- Assert.assertEquals(entries, argCaptor.getValue());
- }
- verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
- }
-
- @Test
- public void testPutAll() throws Exception {
- doTestPutAll(true, false, false);
- }
-
- @Test
- public void testPutAllHasDelete() throws Exception {
- doTestPutAll(true, false, true);
- }
-
- @Test
- public void testPutAllAsync() throws Exception {
- doTestPutAll(false, false, false);
- }
-
- @Test
- public void testPutAllAsyncHasDelete() throws Exception {
- doTestPutAll(false, false, true);
- }
-
- @Test(expected = ExecutionException.class)
- public void testPutAllAsyncError() throws Exception {
- doTestPutAll(false, true, false);
- }
-
- public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
- mock(TableReadFunction.class), writeFn);
- CompletableFuture<Void> future;
- if (error) {
- future = new CompletableFuture();
- future.completeExceptionally(new RuntimeException("Test exception"));
- } else {
- future = CompletableFuture.completedFuture(null);
- }
- // Sync is backed by async so needs to mock the async method
- doReturn(future).when(writeFn).deleteAllAsync(any());
- List<String> keys = Arrays.asList("foo1", "foo2");
- ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
- if (sync) {
- table.deleteAll(keys);
- } else {
- table.deleteAllAsync(keys).get();
- }
- verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
- Assert.assertEquals(keys, argCaptor.getValue());
- verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
- }
-
- @Test
- public void testDeleteAll() throws Exception {
- doTestDeleteAll(true, false);
- }
-
- @Test
- public void testDeleteAllAsync() throws Exception {
- doTestDeleteAll(false, false);
- }
-
- @Test(expected = ExecutionException.class)
- public void testDeleteAllAsyncError() throws Exception {
- doTestDeleteAll(false, true);
- }
-
- @Test
- public void testFlush() {
- TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
- RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
- table.flush();
- verify(writeFn, times(1)).flush();
- }
-
- @Test
- public void testGetWithCallbackExecutor() throws Exception {
- TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
- // Sync is backed by async so needs to mock the async method
- doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
- RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
- Executors.newSingleThreadExecutor());
- Thread testThread = Thread.currentThread();
-
- table.getAsync("foo").thenAccept(result -> {
- Assert.assertEquals("bar", result);
- // Must be executed on the executor thread
- Assert.assertNotSame(testThread, Thread.currentThread());
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
new file mode 100644
index 0000000..ae96d86
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import org.apache.samza.context.Context;
+import org.apache.samza.context.MockContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.retry.RetriableReadFunction;
+import org.apache.samza.table.retry.RetriableWriteFunction;
+import org.apache.samza.table.retry.TableRetryPolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestRemoteTable {
+ private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
+
+ public static Context getMockContext() {
+ Context context = new MockContext();
+ MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
+ doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString());
+ doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString());
+ doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any());
+ doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
+ return context;
+ }
+
+ private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+ TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
+ return getTable(tableId, readFn, writeFn, null);
+ }
+
+ private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
+ TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
+ RemoteTable<K, V> table;
+
+ TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
+ TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
+ doReturn(true).when(readRateLimiter).isRateLimited();
+ doReturn(true).when(writeRateLimiter).isRateLimited();
+
+ ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
+
+ table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
+
+ Context context = getMockContext();
+
+ table.init(context);
+
+ return (T) table;
+ }
+
+ private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception {
+ String tableId = "testGet-" + sync + error + retry;
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ // Sync is backed by async so needs to mock the async method
+ CompletableFuture<String> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ if (!retry) {
+ doReturn(future).when(readFn).getAsync(anyString());
+ } else {
+ final int [] times = new int[] {0};
+ doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar"))
+ .when(readFn).getAsync(anyString());
+ }
+ } else {
+ future = CompletableFuture.completedFuture("bar");
+ doReturn(future).when(readFn).getAsync(anyString());
+ }
+ if (retry) {
+ doReturn(true).when(readFn).isRetriable(any());
+ TableRetryPolicy policy = new TableRetryPolicy();
+ readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
+ }
+ RemoteTable<String, String> table = getTable(tableId, readFn, null);
+ Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
+ verify(table.readRateLimiter, times(1)).throttle(anyString());
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ doTestGet(true, false, false);
+ }
+
+ @Test
+ public void testGetAsync() throws Exception {
+ doTestGet(false, false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testGetAsyncError() throws Exception {
+ doTestGet(false, true, false);
+ }
+
+ @Test
+ public void testGetAsyncErrorRetried() throws Exception {
+ doTestGet(false, true, true);
+ }
+
+ @Test
+ public void testGetMultipleTables() {
+ TableReadFunction<String, String> readFn1 = mock(TableReadFunction.class);
+ TableReadFunction<String, String> readFn2 = mock(TableReadFunction.class);
+
+ // Sync is backed by async so needs to mock the async method
+ doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
+ doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
+
+ RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
+ RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+
+ CompletableFuture<String> future1 = table1.getAsync("foo1");
+ CompletableFuture<String> future2 = table2.getAsync("foo2");
+
+ CompletableFuture.allOf(future1, future2)
+ .thenAccept(u -> {
+ Assert.assertEquals(future1.join(), "bar1");
+ Assert.assertEquals(future2.join(), "bar1");
+ });
+ }
+
+ private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception {
+ String tableId = "testPut-" + sync + error + isDelete + retry;
+ TableWriteFunction<String, String> mockWriteFn = mock(TableWriteFunction.class);
+ TableWriteFunction<String, String> writeFn = mockWriteFn;
+ CompletableFuture<Void> successFuture = CompletableFuture.completedFuture(null);
+ CompletableFuture<Void> failureFuture = new CompletableFuture();
+ failureFuture.completeExceptionally(new RuntimeException("Test exception"));
+ if (!error) {
+ if (isDelete) {
+ doReturn(successFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doReturn(successFuture).when(writeFn).putAsync(any(), any());
+ }
+ } else if (!retry) {
+ if (isDelete) {
+ doReturn(failureFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doReturn(failureFuture).when(writeFn).putAsync(any(), any());
+ }
+ } else {
+ doReturn(true).when(writeFn).isRetriable(any());
+ final int [] times = new int[] {0};
+ if (isDelete) {
+ doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any());
+ } else {
+ doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any());
+ }
+ writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
+ }
+ RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+ if (sync) {
+ table.put("foo", isDelete ? null : "bar");
+ } else {
+ table.putAsync("foo", isDelete ? null : "bar").get();
+ }
+ ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> valCaptor = ArgumentCaptor.forClass(String.class);
+ if (isDelete) {
+ verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture());
+ } else {
+ verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture());
+ Assert.assertEquals("bar", valCaptor.getValue());
+ }
+ Assert.assertEquals("foo", keyCaptor.getValue());
+ if (isDelete) {
+ verify(table.writeRateLimiter, times(1)).throttle(anyString());
+ } else {
+ verify(table.writeRateLimiter, times(1)).throttle(anyString(), anyString());
+ }
+ }
+
+ @Test
+ public void testPut() throws Exception {
+ doTestPut(true, false, false, false);
+ }
+
+ @Test
+ public void testPutDelete() throws Exception {
+ doTestPut(true, false, true, false);
+ }
+
+ @Test
+ public void testPutAsync() throws Exception {
+ doTestPut(false, false, false, false);
+ }
+
+ @Test
+ public void testPutAsyncDelete() throws Exception {
+ doTestPut(false, false, true, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testPutAsyncError() throws Exception {
+ doTestPut(false, true, false, false);
+ }
+
+ @Test
+ public void testPutAsyncErrorRetried() throws Exception {
+ doTestPut(false, true, false, true);
+ }
+
+ private void doTestDelete(boolean sync, boolean error) throws Exception {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
+ mock(TableReadFunction.class), writeFn);
+ CompletableFuture<Void> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(writeFn).deleteAsync(any());
+ ArgumentCaptor<String> argCaptor = ArgumentCaptor.forClass(String.class);
+ if (sync) {
+ table.delete("foo");
+ } else {
+ table.deleteAsync("foo").get();
+ }
+ verify(writeFn, times(1)).deleteAsync(argCaptor.capture());
+ Assert.assertEquals("foo", argCaptor.getValue());
+ verify(table.writeRateLimiter, times(1)).throttle(anyString());
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ doTestDelete(true, false);
+ }
+
+ @Test
+ public void testDeleteAsync() throws Exception {
+ doTestDelete(false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testDeleteAsyncError() throws Exception {
+ doTestDelete(false, true);
+ }
+
+ private void doTestGetAll(boolean sync, boolean error, boolean partial) throws Exception {
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ Map<String, String> res = new HashMap<>();
+ res.put("foo1", "bar1");
+ if (!partial) {
+ res.put("foo2", "bar2");
+ }
+ CompletableFuture<Map<String, String>> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(res);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(readFn).getAllAsync(any());
+ RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+ Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
+ : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
+ verify(table.readRateLimiter, times(1)).throttle(anyCollection());
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ doTestGetAll(true, false, false);
+ }
+
+ @Test
+ public void testGetAllAsync() throws Exception {
+ doTestGetAll(false, false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testGetAllAsyncError() throws Exception {
+ doTestGetAll(false, true, false);
+ }
+
+ // Partial result is an acceptable scenario
+ @Test
+ public void testGetAllPartialResult() throws Exception {
+ doTestGetAll(false, false, true);
+ }
+
+ public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
+ mock(TableReadFunction.class), writeFn);
+ CompletableFuture<Void> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(writeFn).putAllAsync(any());
+ if (hasDelete) {
+ doReturn(future).when(writeFn).deleteAllAsync(any());
+ }
+ List<Entry<String, String>> entries = Arrays.asList(
+ new Entry<>("foo1", "bar1"), new Entry<>("foo2", hasDelete ? null : "bar2"));
+ ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+ if (sync) {
+ table.putAll(entries);
+ } else {
+ table.putAllAsync(entries).get();
+ }
+ verify(writeFn, times(1)).putAllAsync(argCaptor.capture());
+ if (hasDelete) {
+ ArgumentCaptor<List> delArgCaptor = ArgumentCaptor.forClass(List.class);
+ verify(writeFn, times(1)).deleteAllAsync(delArgCaptor.capture());
+ Assert.assertEquals(Arrays.asList("foo2"), delArgCaptor.getValue());
+ Assert.assertEquals(1, argCaptor.getValue().size());
+ Assert.assertEquals("foo1", ((Entry) argCaptor.getValue().get(0)).getKey());
+ verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+ } else {
+ Assert.assertEquals(entries, argCaptor.getValue());
+ }
+ verify(table.writeRateLimiter, times(1)).throttleRecords(anyCollection());
+ }
+
+ @Test
+ public void testPutAll() throws Exception {
+ doTestPutAll(true, false, false);
+ }
+
+ @Test
+ public void testPutAllHasDelete() throws Exception {
+ doTestPutAll(true, false, true);
+ }
+
+ @Test
+ public void testPutAllAsync() throws Exception {
+ doTestPutAll(false, false, false);
+ }
+
+ @Test
+ public void testPutAllAsyncHasDelete() throws Exception {
+ doTestPutAll(false, false, true);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testPutAllAsyncError() throws Exception {
+ doTestPutAll(false, true, false);
+ }
+
+ public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
+ mock(TableReadFunction.class), writeFn);
+ CompletableFuture<Void> future;
+ if (error) {
+ future = new CompletableFuture();
+ future.completeExceptionally(new RuntimeException("Test exception"));
+ } else {
+ future = CompletableFuture.completedFuture(null);
+ }
+ // Sync is backed by async so needs to mock the async method
+ doReturn(future).when(writeFn).deleteAllAsync(any());
+ List<String> keys = Arrays.asList("foo1", "foo2");
+ ArgumentCaptor<List> argCaptor = ArgumentCaptor.forClass(List.class);
+ if (sync) {
+ table.deleteAll(keys);
+ } else {
+ table.deleteAllAsync(keys).get();
+ }
+ verify(writeFn, times(1)).deleteAllAsync(argCaptor.capture());
+ Assert.assertEquals(keys, argCaptor.getValue());
+ verify(table.writeRateLimiter, times(1)).throttle(anyCollection());
+ }
+
+ @Test
+ public void testDeleteAll() throws Exception {
+ doTestDeleteAll(true, false);
+ }
+
+ @Test
+ public void testDeleteAllAsync() throws Exception {
+ doTestDeleteAll(false, false);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testDeleteAllAsyncError() throws Exception {
+ doTestDeleteAll(false, true);
+ }
+
+ @Test
+ public void testFlush() {
+ TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
+ RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+ table.flush();
+ verify(writeFn, times(1)).flush();
+ }
+
+ @Test
+ public void testGetWithCallbackExecutor() throws Exception {
+ TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
+ // Sync is backed by async so needs to mock the async method
+ doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
+ RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
+ Executors.newSingleThreadExecutor());
+ Thread testThread = Thread.currentThread();
+
+ table.getAsync("foo").thenAccept(result -> {
+ Assert.assertEquals("bar", result);
+ // Must be executed on the executor thread
+ Assert.assertNotSame(testThread, Thread.currentThread());
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
index 3d8e36f..907242f 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
@@ -36,17 +36,17 @@ import org.apache.samza.metrics.Timer;
import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.table.remote.RemoteTableProvider;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
+
import org.junit.Assert;
import org.junit.Test;
@@ -207,8 +207,8 @@ public class TestRemoteTableDescriptor {
RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId());
provider.init(createMockContext(desc));
Table table = provider.getTable();
- Assert.assertTrue(table instanceof RemoteReadWriteTable);
- RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+ Assert.assertTrue(table instanceof RemoteTable);
+ RemoteTable rwTable = (RemoteTable) table;
if (numRateLimitOps > 0) {
Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null);
Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null);
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
index 5116cab..050ea55 100644
--- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
+++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
@@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.Table;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.TestRemoteReadWriteTable;
+import org.apache.samza.table.remote.TestRemoteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
import org.junit.Test;
@@ -54,7 +54,7 @@ public class TestRetriableTableFunctions {
public TableMetricsUtil getMetricsUtil(String tableId) {
Table table = mock(Table.class);
- Context context = TestRemoteReadWriteTable.getMockContext();
+ Context context = TestRemoteTable.getMockContext();
return new TableMetricsUtil(context, table, tableId);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
index ddb79ba..fc9ce76 100644
--- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.samza.storage.kv.inmemory;
import java.util.Map;
-import junit.framework.Assert;
+
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
@@ -28,7 +28,9 @@ import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.storage.kv.LocalTableProviderFactory;
import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+
import org.junit.Test;
+import org.junit.Assert;
public class TestInMemoryTableDescriptor {
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index 62fb3da..319fb0f 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.samza.storage.kv.descriptors;
import java.util.Map;
-import junit.framework.Assert;
+
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.MapConfig;
@@ -30,7 +30,9 @@ import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.LocalTableProviderFactory;
import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+
import org.junit.Test;
+import org.junit.Assert;
public class TestRocksDbTableDescriptor {
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
deleted file mode 100644
index eae6bb0..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * A store backed readable and writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
- implements ReadWriteTable<K, V> {
-
- /**
- * Constructs an instance of {@link LocalReadWriteTable}
- * @param tableId the table Id
- * @param kvStore the backing store
- */
- public LocalReadWriteTable(String tableId, KeyValueStore kvStore) {
- super(tableId, kvStore);
- }
-
- @Override
- public void put(K key, V value) {
- if (value != null) {
- instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
- } else {
- delete(key);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAsync(K key, V value) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- put(key, value);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- List<Entry<K, V>> toPut = new LinkedList<>();
- List<K> toDelete = new LinkedList<>();
- entries.forEach(e -> {
- if (e.getValue() != null) {
- toPut.add(e);
- } else {
- toDelete.add(e.getKey());
- }
- });
-
- if (!toPut.isEmpty()) {
- instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut));
- }
-
- if (!toDelete.isEmpty()) {
- deleteAll(toDelete);
- }
- }
-
- @Override
- public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- putAll(entries);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void delete(K key) {
- instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
- }
-
- @Override
- public CompletableFuture<Void> deleteAsync(K key) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- delete(key);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void deleteAll(List<K> keys) {
- instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
- }
-
- @Override
- public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
- CompletableFuture<Void> future = new CompletableFuture();
- try {
- deleteAll(keys);
- future.complete(null);
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void flush() {
- instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
- }
-
- private interface Func0 {
- void apply();
- }
-
- private void instrument(Counter counter, Timer timer, Func0 func) {
- incCounter(counter);
- long startNs = clock.nanoTime();
- func.apply();
- updateTimer(timer, clock.nanoTime() - startNs);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
deleted file mode 100644
index 29ddb15..0000000
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import com.google.common.base.Supplier;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A store backed readable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
-
- protected final KeyValueStore<K, V> kvStore;
-
- /**
- * Constructs an instance of {@link LocalReadableTable}
- * @param tableId the table Id
- * @param kvStore the backing store
- */
- public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
- super(tableId);
- Preconditions.checkNotNull(kvStore, "null KeyValueStore");
- this.kvStore = kvStore;
- }
-
- @Override
- public V get(K key) {
- V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
- if (result == null) {
- incCounter(readMetrics.numMissedLookups);
- }
- return result;
- }
-
- @Override
- public CompletableFuture<V> getAsync(K key) {
- CompletableFuture<V> future = new CompletableFuture();
- try {
- future.complete(get(key));
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public Map<K, V> getAll(List<K> keys) {
- Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
- result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
- return result;
- }
-
- @Override
- public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
- CompletableFuture<Map<K, V>> future = new CompletableFuture();
- try {
- future.complete(getAll(keys));
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public void close() {
- // The KV store is not closed here as it may still be needed by downstream operators,
- // it will be closed by the SamzaContainer
- }
-
- private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
- incCounter(counter);
- long startNs = clock.nanoTime();
- T result = func.get();
- updateTimer(timer, clock.nanoTime() - startNs);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
new file mode 100644
index 0000000..d9767b6
--- /dev/null
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.BaseReadWriteTable;
+
+import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
+import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
+
+
+/**
+ * A store backed readable and writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalTable<K, V> extends BaseReadWriteTable<K, V> {
+
+ protected final KeyValueStore<K, V> kvStore;
+
+ /**
+ * Constructs an instance of {@link LocalTable}
+ * @param tableId the table Id
+ * @param kvStore the backing store
+ */
+ public LocalTable(String tableId, KeyValueStore kvStore) {
+ super(tableId);
+ Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+ this.kvStore = kvStore;
+ }
+
+ @Override
+ public V get(K key) {
+ V result = instrument(metrics.numGets, metrics.getNs, () -> kvStore.get(key));
+ if (result == null) {
+ incCounter(metrics.numMissedLookups);
+ }
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<V> getAsync(K key) {
+ CompletableFuture<V> future = new CompletableFuture();
+ try {
+ future.complete(get(key));
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public Map<K, V> getAll(List<K> keys) {
+ Map<K, V> result = instrument(metrics.numGetAlls, metrics.getAllNs, () -> kvStore.getAll(keys));
+ result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+ CompletableFuture<Map<K, V>> future = new CompletableFuture();
+ try {
+ future.complete(getAll(keys));
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void put(K key, V value) {
+ if (value != null) {
+ instrument(metrics.numPuts, metrics.putNs, () -> kvStore.put(key, value));
+ } else {
+ delete(key);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(K key, V value) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ put(key, value);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ List<Entry<K, V>> toPut = new LinkedList<>();
+ List<K> toDelete = new LinkedList<>();
+ entries.forEach(e -> {
+ if (e.getValue() != null) {
+ toPut.add(e);
+ } else {
+ toDelete.add(e.getKey());
+ }
+ });
+
+ if (!toPut.isEmpty()) {
+ instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
+ }
+
+ if (!toDelete.isEmpty()) {
+ deleteAll(toDelete);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ putAll(entries);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void delete(K key) {
+ instrument(metrics.numDeletes, metrics.deleteNs, () -> kvStore.delete(key));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(K key) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ delete(key);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void deleteAll(List<K> keys) {
+ instrument(metrics.numDeleteAlls, metrics.deleteAllNs, () -> kvStore.deleteAll(keys));
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+ CompletableFuture<Void> future = new CompletableFuture();
+ try {
+ deleteAll(keys);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ @Override
+ public void flush() {
+ instrument(metrics.numFlushes, metrics.flushNs, () -> kvStore.flush());
+ }
+
+ @Override
+ public void close() {
+ // The KV store is not closed here as it may still be needed by downstream operators,
+ // it will be closed by the SamzaContainer
+ }
+
+ private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+ incCounter(counter);
+ long startNs = clock.nanoTime();
+ T result = func.get();
+ updateTimer(timer, clock.nanoTime() - startNs);
+ return result;
+ }
+
+ private interface Func0 {
+ void apply();
+ }
+
+ private void instrument(Counter counter, Timer timer, Func0 func) {
+ incCounter(counter);
+ long startNs = clock.nanoTime();
+ func.apply();
+ updateTimer(timer, clock.nanoTime() - startNs);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6a75503d/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
index 3be61d0..5099a7e 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
@@ -20,8 +20,7 @@ package org.apache.samza.storage.kv;
import com.google.common.base.Preconditions;
import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.BaseTableProvider;
/**
@@ -53,10 +52,10 @@ public class LocalTableProvider extends BaseTableProvider {
}
@Override
- public Table getTable() {
+ public ReadWriteTable getTable() {
Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId);
- ReadableTable table = new LocalReadWriteTable(tableId, kvStore);
+ ReadWriteTable table = new LocalTable(tableId, kvStore);
table.init(this.context);
return table;
}