You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/05/22 21:59:17 UTC
[samza] branch master updated: SAMZA-2191: Batch support for table
APIs (#1031)
This is an automated email from the ASF dual-hosted git repository.
weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 933ce10 SAMZA-2191: Batch support for table APIs (#1031)
933ce10 is described below
commit 933ce10d8c929a729ac2fec23c4337f99440686f
Author: dengpanyin <34...@users.noreply.github.com>
AuthorDate: Wed May 22 14:59:12 2019 -0700
SAMZA-2191: Batch support for table APIs (#1031)
SAMZA-2191: Batching support for table APIs
Add a CompactBatch class to allow users to compact and batch
the operations sent to the remote store via table APIs.
---
.../learn/documentation/versioned/api/table-api.md | 30 +++
.../org/apache/samza/table/batching/Batch.java | 90 +++++++
.../apache/samza/table/batching/BatchProvider.java | 50 ++++
.../org/apache/samza/table/batching/Operation.java | 43 ++++
.../table/descriptors/RemoteTableDescriptor.java | 15 +-
.../apache/samza/table/batching/AbstractBatch.java | 76 ++++++
.../samza/table/batching/AsyncBatchingTable.java | 170 +++++++++++++
.../apache/samza/table/batching/BatchHandler.java | 38 +++
.../apache/samza/table/batching/BatchMetrics.java | 53 ++++
.../samza/table/batching/BatchProcessor.java | 194 +++++++++++++++
.../batching/BatchingNotSupportedException.java | 42 ++++
.../apache/samza/table/batching/CompactBatch.java | 84 +++++++
.../samza/table/batching/CompactBatchProvider.java | 27 ++
.../apache/samza/table/batching/CompleteBatch.java | 68 +++++
.../table/batching/CompleteBatchProvider.java | 27 ++
.../samza/table/batching/DeleteOperation.java | 64 +++++
.../apache/samza/table/batching/GetOperation.java | 81 ++++++
.../apache/samza/table/batching/PutOperation.java | 68 +++++
.../samza/table/batching/TableBatchHandler.java | 161 ++++++++++++
.../org/apache/samza/table/remote/RemoteTable.java | 15 ++
.../samza/table/remote/RemoteTableProvider.java | 18 +-
.../apache/samza/table/utils/TableMetricsUtil.java | 7 +
.../samza/table/batching/TestBatchProcessor.java | 164 ++++++++++++
.../samza/table/batching/TestBatchTable.java | 275 +++++++++++++++++++++
.../samza/table/caching/TestCachingTable.java | 2 +
.../apache/samza/table/remote/TestRemoteTable.java | 2 +-
.../samza/test/table/TestRemoteTableEndToEnd.java | 6 +-
.../table/TestRemoteTableWithBatchEndToEnd.java | 253 +++++++++++++++++++
.../org/apache/samza/test/table/TestTableData.java | 10 +
29 files changed, 2127 insertions(+), 6 deletions(-)
diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md
index d8695ba..106d27a 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -245,6 +245,8 @@ The table below summarizes table metrics:
| Metrics | Class | Description |
|---------|-------|-------------|
+|`num-batches`|`AsyncBatchingTable`|Number of batch operations|
+|`batch-ns`|`AsyncBatchingTable`|Time interval between opening and closing a batch|
|`get-ns`|`ReadableTable`|Average latency of `get/getAsync()` operations|
|`getAll-ns`|`ReadableTable`|Average latency of `getAll/getAllAsync()` operations|
|`num-gets`|`ReadableTable`|Count of `get/getAsync()` operations
@@ -301,6 +303,29 @@ Couchbase is supported as remote table. See
[`CouchbaseTableReadFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableReadFunction.java) and
[`CouchbaseTableWriteFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java).
+### Batching
+
+Remote Table has built-in client-side batching support for its async executions.
+This is useful when a remote data store supports batch processing and is not sophisticated enough
+to handle heavy inbound requests.
+
+#### Configuration
+
+Batching can be enabled with [`RemoteTableDescriptor`](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java)
+by providing a [`BatchProvider`](https://github.com/apache/samza/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java)
+The user can choose:
+
+1. A [`CompactBatchProvider`](https://github.com/apache/samza/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java) which provides a batch such that
+ the operations are compacted by the key. For update operations, the latter update will override the value of the previous one when they have the same key. For query operations,
+ the operations will be combined as a single operation when they have the same key.
+2. A [`CompleteBatchProvider`](https://github.com/apache/samza/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java) which provides a batch such that
+ all the operations will be visible to the remote store regardless of the keys.
+3. A user-defined instance of [`BatchProvider`].
+
+For each [`BatchProvider`], the user can config the following:
+1. Specify the max size the batch can grow before being closed by `withmaxBatchSize(int)`
+2. Specify the max time the batch can last before being closed by `withmaxBatchDelay(Duration)`
+
### Rate Limiting
Remote Table has built-in client-side rate limiting support in both of its sync
@@ -608,6 +633,11 @@ We recommend:
retries at a higher and more abstract Remote Table level, which implies
retrying is not a responsibility of I/O functions.
+### Batching
+
+Samza Remote Table API can be configured to utilize user-supplied bath providers.
+You may refer to the [Batching](#batching) section under Remote Table for more details.
+
### Caching
Samza Remote Table API can be configured to utilize user-supplied caches.
diff --git a/samza-api/src/main/java/org/apache/samza/table/batching/Batch.java b/samza-api/src/main/java/org/apache/samza/table/batching/Batch.java
new file mode 100644
index 0000000..e4e0655
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/batching/Batch.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manages a sequence of {@link Operation}s, which will be performed as a batch.
+ * A batch can be configured with a {@code maxBatchSize} and/or {@code maxBatchDelay}.
+ * When the number of operations in the batch exceeds the {@code maxBatchSize}
+ * or the time window exceeds the {@code maxBatchDelay}, the batch will be performed.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+public interface Batch<K, V> {
+ /**
+ * Add an operation to the batch.
+ *
+ * @param operation The operation to be added.
+ * @return A {@link CompletableFuture} that indicate the status of the batch.
+ */
+ CompletableFuture<Void> addOperation(Operation<K, V> operation);
+
+ /**
+ * Close the bach so that it will not accept more operations.
+ */
+ void close();
+
+ /**
+ * @return Whether the bach can accept more operations.
+ */
+ boolean isClosed();
+
+ /**
+ * @return The operations buffered by the batch.
+ */
+ Collection<Operation<K, V>> getOperations();
+
+ /**
+ * @return The batch max delay.
+ */
+ Duration getMaxBatchDelay();
+
+ /**
+ * @return The batch capacity
+ */
+ int getMaxBatchSize();
+
+ /**
+ * Change the batch status to be complete.
+ */
+ void complete();
+
+ /**
+ * Change the batch status to be complete with exception.
+ */
+ void completeExceptionally(Throwable throwable);
+
+ /**
+ * @return The number of operations in the batch.
+ */
+ int size();
+
+ /**
+ * @return True if the batch is empty.
+ */
+ default boolean isEmpty() {
+ return size() == 0;
+ }
+}
\ No newline at end of file
diff --git a/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java b/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java
new file mode 100644
index 0000000..ae3ad60
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.io.Serializable;
+import java.time.Duration;
+import org.apache.samza.table.remote.TablePart;
+
+
+public abstract class BatchProvider<K, V> implements TablePart, Serializable {
+ public abstract Batch<K, V> getBatch();
+
+ private int maxBatchSize = 100;
+ private Duration maxBatchDelay = Duration.ofMillis(100);
+
+ public BatchProvider<K, V> withMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ public BatchProvider<K, V> withMaxBatchDelay(Duration maxBatchDelay) {
+ this.maxBatchDelay = maxBatchDelay;
+ return this;
+ }
+
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ public Duration getMaxBatchDelay() {
+ return maxBatchDelay;
+ }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/batching/Operation.java b/samza-api/src/main/java/org/apache/samza/table/batching/Operation.java
new file mode 100644
index 0000000..d7acd9a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/batching/Operation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+/**
+ * Interface for operations that can be batched.
+ *
+ * @param <K> The key type associated with the operation.
+ * @param <V> The value type associated with the operation.
+ */
+public interface Operation<K, V> {
+ /**
+ * @return The key associated with the operation.
+ */
+ K getKey();
+
+ /**
+ * @return The value associated with the operation.
+ */
+ V getValue();
+
+ /**
+ * @return The extra arguments associated with the operation.
+ */
+ Object[] getArgs();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 3c7b62b..b0590c5 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.table.batching.BatchProvider;
import org.apache.samza.table.remote.TablePart;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
@@ -74,6 +75,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
public static final String READ_RETRY_POLICY = "io.read.retry.policy";
public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
+ public static final String BATCH_PROVIDER = "io.batch.provider";
// Input support for a specific remote store (required)
private TableReadFunction<K, V> readFn;
@@ -84,12 +86,14 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
// Rate limiter for client-side throttling; it is set by withRateLimiter()
private RateLimiter rateLimiter;
- // Indicate whether read rate limiter is enabled or not
private boolean enableReadRateLimiter = true;
// Indicate whether write rate limiter is enabled or not
private boolean enableWriteRateLimiter = true;
+ // Batching support to reduce traffic volume sent to the remote store.
+ private BatchProvider<K, V> batchProvider;
+
// Rates for constructing the default rate limiter when they are non-zero
private Map<String, Integer> tagCreditsMap = new HashMap<>();
@@ -259,6 +263,11 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
return this;
}
+ public RemoteTableDescriptor<K, V> withBatchProvider(BatchProvider<K, V> batchProvider) {
+ this.batchProvider = batchProvider;
+ return this;
+ }
+
@Override
public String getProviderFactoryClassName() {
return PROVIDER_FACTORY_CLASS_NAME;
@@ -327,6 +336,10 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
addTablePartConfig(WRITE_FN, writeFn, jobConfig, tableConfig);
}
+ if (batchProvider != null) {
+ addTableConfig(BATCH_PROVIDER, SerdeUtils.serialize("batch provider", batchProvider), tableConfig);
+ addTablePartConfig(BATCH_PROVIDER, batchProvider, jobConfig, tableConfig);
+ }
return Collections.unmodifiableMap(tableConfig);
}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/AbstractBatch.java b/samza-core/src/main/java/org/apache/samza/table/batching/AbstractBatch.java
new file mode 100644
index 0000000..5f65f77
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/AbstractBatch.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+
+abstract class AbstractBatch<K, V> implements Batch<K, V> {
+ protected final int maxBatchSize;
+ protected final Duration maxBatchDelay;
+ protected final CompletableFuture<Void> completableFuture;
+ protected boolean closed = false;
+ protected static final BatchingNotSupportedException BATCH_NOT_SUPPORTED_EXCEPTION = new BatchingNotSupportedException();
+
+ AbstractBatch(int maxBatchSize, Duration maxBatchDelay) {
+ // The max number of {@link Operation}s that the batch can hold.
+ this.maxBatchSize = maxBatchSize;
+ // The max time that the batch can last before being performed.
+ this.maxBatchDelay = maxBatchDelay;
+ completableFuture = new CompletableFuture<>();
+ }
+
+ /**
+ * @return The batch capacity.
+ */
+ @Override
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ /**
+ * @return The batch max delay.
+ */
+ @Override
+ public Duration getMaxBatchDelay() {
+ return maxBatchDelay;
+ }
+
+ @Override
+ public void complete() {
+ completableFuture.complete(null);
+ }
+
+ @Override
+ public void completeExceptionally(Throwable throwable) {
+ completableFuture.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java b/samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java
new file mode 100644
index 0000000..f432ad0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.util.HighResolutionClock;
+
+
+/**
+ * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations.
+ *
+ * This batching table does not guarantee any ordering of different operation types within the batch.
+ * For instance, query(Q) and update(u) operations arrives in the following sequences, Q1, U1, Q2, U2,
+ * it does not mean the remote data store will receive the messages in the same order. Instead,
+ * the operations will be grouped by type and sent via micro batches. For this sequence, Q1 and Q2 will
+ * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, the implementation class
+ * can decide the order of the micro batches.
+ *
+ * Synchronized table operations (get/put/delete) should be used with caution for the batching feature.
+ * If the table is used by a single thread, there will be at most one operation in the batch, and the
+ * batch will be performed when the TTL of the batch window expires. Batching does not make sense in this scenario.
+ *
+ * The Batch implementation class can throw {@link BatchingNotSupportedException} if it thinks the operation is
+ * not batch-able. When receiving this exception, {@link AsyncBatchingTable} will send the operation to the
+ * {@link AsyncReadWriteTable}.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class AsyncBatchingTable<K, V> implements AsyncReadWriteTable<K, V> {
+ private final AsyncReadWriteTable<K, V> table;
+ private final String tableId;
+ private final BatchProvider<K, V> batchProvider;
+ private final ScheduledExecutorService batchTimerExecutorService;
+ private BatchProcessor<K, V> batchProcessor;
+
+ /**
+ * @param tableId The id of the table.
+ * @param table The target table that serves the batch operations.
+ * @param batchProvider Batch provider to create a batch instance.
+ */
+ public AsyncBatchingTable(String tableId, AsyncReadWriteTable table, BatchProvider<K, V> batchProvider,
+ ScheduledExecutorService batchTimerExecutorService) {
+ Preconditions.checkNotNull(tableId);
+ Preconditions.checkNotNull(table);
+ Preconditions.checkNotNull(batchProvider);
+ Preconditions.checkNotNull(batchTimerExecutorService);
+
+ this.tableId = tableId;
+ this.table = table;
+ this.batchProvider = batchProvider;
+ this.batchTimerExecutorService = batchTimerExecutorService;
+ }
+
+ @Override
+ public CompletableFuture<V> getAsync(K key, Object... args) {
+ try {
+ return batchProcessor.processQueryOperation(new GetOperation<>(key, args));
+ } catch (BatchingNotSupportedException e) {
+ return table.getAsync(key, args);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object... args) {
+ return table.getAllAsync(keys);
+ }
+
+ @Override
+ public <T> CompletableFuture<T> readAsync(int opId, Object ... args) {
+ return table.readAsync(opId, args);
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(K key, V value, Object... args) {
+ try {
+ return batchProcessor.processUpdateOperation(new PutOperation<>(key, value, args));
+ } catch (BatchingNotSupportedException e) {
+ return table.putAsync(key, value, args);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object... args) {
+ return table.putAllAsync(entries);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(K key, Object... args) {
+ try {
+ return batchProcessor.processUpdateOperation(new DeleteOperation<>(key, args));
+ } catch (BatchingNotSupportedException e) {
+ return table.deleteAsync(key, args);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAllAsync(List<K> keys, Object... args) {
+ return table.deleteAllAsync(keys);
+ }
+
+ @Override
+ public void init(Context context) {
+ table.init(context);
+ final TableMetricsUtil metricsUtil = new TableMetricsUtil(context, this, tableId);
+
+ createBatchProcessor(TableMetricsUtil.mayCreateHighResolutionClock(context.getJobContext().getConfig()),
+ new BatchMetrics(metricsUtil));
+ }
+
+ @Override
+ public <T> CompletableFuture<T> writeAsync(int opId, Object ... args) {
+ return table.writeAsync(opId, args);
+ }
+
+ @Override
+ public void flush() {
+ table.flush();
+ }
+
+ @Override
+ public void close() {
+ batchProcessor.stop();
+ table.close();
+ }
+
+ @VisibleForTesting
+ void createBatchProcessor(HighResolutionClock clock, BatchMetrics batchMetrics) {
+ batchProcessor = new BatchProcessor<>(batchMetrics, new TableBatchHandler<>(table),
+ batchProvider, clock, batchTimerExecutorService);
+ }
+
+ @VisibleForTesting
+ BatchProcessor<K, V> getBatchProcessor() {
+ return batchProcessor;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java
new file mode 100644
index 0000000..81256ef
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Define how the batch operations will be handled.
+ *
+ * @param <K> The key type of the operations
+ * @param <V> The value type of the operations.
+ */
+public interface BatchHandler<K, V> {
+ /**
+ *
+ * @param batch The batch to be handled
+ * @return A {@link CompletableFuture} that indicates the status of the handle process.
+ */
+ CompletableFuture<Void> handle(Batch<K, V> batch);
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchMetrics.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchMetrics.java
new file mode 100644
index 0000000..f160545
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchMetrics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * Wrapper of batch-related metrics.
+ */
+class BatchMetrics {
+ /**
+ * The number of batches
+ */
+ final Counter batchCount;
+
+ /**
+ * The time duration between opening and closing the batch
+ */
+ final Timer batchDuration;
+
+ public BatchMetrics(TableMetricsUtil metricsUtil) {
+ batchCount = metricsUtil.newCounter("num-batches");
+ batchDuration = metricsUtil.newTimer("batch-ns");
+ }
+
+ public void incBatchCount() {
+ batchCount.inc();
+ }
+
+ public void updateBatchDuration(long d) {
+ batchDuration.update(d);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
new file mode 100644
index 0000000..bde6d48
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.util.HighResolutionClock;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor<K, V> {
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final ReentrantLock lock = new ReentrantLock();
+ private final BatchHandler<K, V> batchHandler;
+ private final BatchProvider<K, V> batchProvider;
+ private final BatchMetrics batchMetrics;
+ private final HighResolutionClock clock;
+ private Batch<K, V> batch;
+ private ScheduledFuture<?> scheduledFuture;
+ private long batchOpenTimestamp;
+
+ /**
+ * @param batchHandler Defines how each batch will be processed.
+ * @param batchProvider The batch provider to create a batch instance.
+ * @param clock A clock used to get the timestamp.
+ * @param scheduledExecutorService A scheduled executor service to set timers for the managed batches.
+ */
+ public BatchProcessor(BatchMetrics batchMetrics, BatchHandler<K, V> batchHandler, BatchProvider batchProvider,
+ HighResolutionClock clock, ScheduledExecutorService scheduledExecutorService) {
+ Preconditions.checkNotNull(batchHandler);
+ Preconditions.checkNotNull(batchProvider);
+ Preconditions.checkNotNull(clock);
+ Preconditions.checkNotNull(scheduledExecutorService);
+
+ this.batchHandler = batchHandler;
+ this.batchProvider = batchProvider;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.batchMetrics = batchMetrics;
+ this.clock = clock;
+ }
+
+ private CompletableFuture<Void> addOperation(Operation<K, V> operation) {
+ if (batch == null) {
+ startNewBatch();
+ }
+ final CompletableFuture<Void> res = batch.addOperation(operation);
+ if (batch.isClosed()) {
+ processBatch(true);
+ }
+ return res;
+ }
+
+ /**
+ * @param operation The query operation to be added to the batch.
+ * @return A {@link CompletableFuture} to indicate whether the operation is finished.
+ */
+ CompletableFuture<V> processQueryOperation(Operation<K, V> operation) {
+ Preconditions.checkNotNull(operation);
+ Preconditions.checkArgument(operation instanceof GetOperation);
+
+ lock.lock();
+ try {
+ GetOperation<K, V> getOperation = (GetOperation) operation;
+ addOperation(getOperation);
+ return getOperation.getCompletableFuture();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @param operation The update operation to be added to the batch.
+ * @return A {@link CompletableFuture} to indicate whether the operation is finished.
+ */
+ CompletableFuture<Void> processUpdateOperation(Operation<K, V> operation) {
+ Preconditions.checkNotNull(operation);
+ Preconditions.checkArgument(operation instanceof PutOperation || operation instanceof DeleteOperation);
+
+ lock.lock();
+ try {
+ return addOperation(operation);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void processBatch(boolean cancelTimer) {
+ mayCancelTimer(cancelTimer);
+ closeBatch();
+ batchHandler.handle(batch);
+ startNewBatch();
+ }
+
+ private void startNewBatch() {
+ batch = batchProvider.getBatch();
+ batchOpenTimestamp = clock.nanoTime();
+ batchMetrics.incBatchCount();
+ setBatchTimer(batch);
+ }
+
+ private void closeBatch() {
+ batch.close();
+ batchMetrics.updateBatchDuration(clock.nanoTime() - batchOpenTimestamp);
+ }
+
+ private void mayCancelTimer(boolean cancelTimer) {
+ if (cancelTimer && scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+ }
+
+ /**
+ * Set a timer to close the batch when the batch is older than the max delay.
+ */
+ private void setBatchTimer(Batch<K, V> batch) {
+ final long maxDelay = batch.getMaxBatchDelay().toMillis();
+ if (maxDelay != Integer.MAX_VALUE) {
+ scheduledFuture = scheduledExecutorService.schedule(() -> {
+ lock.lock();
+ try {
+ processBatch(false);
+ } finally {
+ lock.unlock();
+ }
+ }, maxDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Stop the processor and cancel all the pending futures.
+ */
+ void stop() {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+ }
+
+ /**
+ * Get the current number of operations received.
+ */
+ @VisibleForTesting
+ int size() {
+ return batch == null ? 0 : batch.size();
+ }
+
+ /**
+ * Get the latest update operation for the specified key.
+ */
+ @VisibleForTesting
+ Operation<K, V> getLastUpdate(K key) {
+ final Collection<Operation<K, V>> operations = batch.getOperations();
+ final Iterator<Operation<K, V>> iterator = operations.iterator();
+ Operation<K, V> lastUpdate = null;
+ while (iterator.hasNext()) {
+ final Operation<K, V> operation = iterator.next();
+ if ((operation instanceof PutOperation || operation instanceof DeleteOperation)
+ && operation.getKey().equals(key)) {
+ lastUpdate = operation;
+ }
+ }
+ return lastUpdate;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchingNotSupportedException.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchingNotSupportedException.java
new file mode 100644
index 0000000..ac0a19f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchingNotSupportedException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+/**
+ * When adding an {@link Operation} to a {@link Batch}, if the batch implementation class
+ * does not want to batch the operation, it should throw a {@link BatchingNotSupportedException}.
+ */
+public class BatchingNotSupportedException extends UnsupportedOperationException {
+ public BatchingNotSupportedException() {
+ super();
+ }
+
+ public BatchingNotSupportedException(String message) {
+ super(message);
+ }
+
+ public BatchingNotSupportedException(Throwable cause) {
+ super(cause);
+ }
+
+ public BatchingNotSupportedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
new file mode 100644
index 0000000..8589505
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+/**
+ * A newer update operation will override the value of an earlier one.
+ * Consecutive query operations with the same key will be combined as a single query.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+class CompactBatch<K, V> extends AbstractBatch<K, V> {
+ private final Map<K, Operation<K, V>> updates = new LinkedHashMap<>();
+ private final Map<K, Operation<K, V>> queries = new LinkedHashMap<>();
+
+ public CompactBatch(int maxBatchSize, Duration maxBatchDelay) {
+ super(maxBatchSize, maxBatchDelay);
+ }
+
+ /**
+ * @return The batch size.
+ */
+ @Override
+ public int size() {
+ return updates.size() + queries.size();
+ }
+
+ /**
+ * When adding a GetOperation to the batch, mark the GetOperation completed immediately if there is
+ * an UpdateOperation for the key, otherwise, adding the operation to a list.
+ * When adding a Put or DeleteOperation, if there is an update operation for the same key, the existing
+ * operation will be replaced by the new one.
+ */
+ @Override
+ public CompletableFuture<Void> addOperation(Operation<K, V> operation) {
+ Preconditions.checkNotNull(operation);
+
+ if (operation.getArgs() != null && operation.getArgs().length > 0) {
+ throw BATCH_NOT_SUPPORTED_EXCEPTION;
+ }
+
+ if (operation instanceof GetOperation) {
+ queries.putIfAbsent(operation.getKey(), operation);
+ } else {
+ updates.put(operation.getKey(), operation);
+ }
+ if (size() >= maxBatchSize) {
+ close();
+ }
+ return completableFuture;
+ }
+
+ @Override
+ public Collection<Operation<K, V>> getOperations() {
+ return Stream.of(queries.values(), updates.values()).flatMap(Collection::stream).collect(Collectors.toList());
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java
new file mode 100644
index 0000000..d5ed8a8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+public class CompactBatchProvider<K, V> extends BatchProvider<K, V> {
+ @Override
+ public Batch<K, V> getBatch() {
+ return new CompactBatch<>(getMaxBatchSize(), getMaxBatchDelay());
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatch.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatch.java
new file mode 100644
index 0000000..0a9aadb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatch.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The complete operation sequences are stored in the batch, regardless of whether the keys are the same or not.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+class CompleteBatch<K, V> extends AbstractBatch<K, V> {
+ private final List<Operation<K, V>> operations = new ArrayList<>();
+
+ public CompleteBatch(int maxBatchSize, Duration maxBatchDelay) {
+ super(maxBatchSize, maxBatchDelay);
+ }
+
+ /**
+ * @return The batch size.
+ */
+ @Override
+ public int size() {
+ return operations.size();
+ }
+
+ /**
+ * All operations will be buffered without any compaction.
+ */
+ @Override
+ public CompletableFuture<Void> addOperation(Operation<K, V> operation) {
+ Preconditions.checkNotNull(operation);
+ operations.add(operation);
+ if (size() >= maxBatchSize) {
+ close();
+ }
+ return completableFuture;
+ }
+
+ @Override
+ public Collection<Operation<K, V>> getOperations() {
+ return operations;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java
new file mode 100644
index 0000000..8068281
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+public class CompleteBatchProvider<K, V> extends BatchProvider<K, V> {
+ @Override
+ public Batch<K, V> getBatch() {
+ return new CompleteBatch<>(getMaxBatchSize(), getMaxBatchDelay());
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java b/samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java
new file mode 100644
index 0000000..3f19f6d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Delete operation.
+ *
+ * @param <K> The type of the key.
+ */
+public class DeleteOperation<K, V> implements Operation<K, V> {
+ final K key;
+ final Object[] args;
+
+ public DeleteOperation(K key, Object ... args) {
+ Preconditions.checkNotNull(key);
+
+ this.key = key;
+ this.args = args;
+ }
+
+ /**
+ * @return The key to be deleted.
+ */
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ /**
+ * @return null.
+ */
+ @Override
+ public V getValue() {
+ return null;
+ }
+
+ /**
+ * @return The extra arguments associated with the table.
+ */
+ @Override
+ public Object[] getArgs() {
+ return args;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/GetOperation.java b/samza-core/src/main/java/org/apache/samza/table/batching/GetOperation.java
new file mode 100644
index 0000000..cae7779
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/GetOperation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Get operation.
+ *
+ * @param <K> The type of the key.
+ */
+public class GetOperation<K, V> implements Operation<K, V> {
+ final K key;
+ final Object[] args;
+ final CompletableFuture<V> completableFuture = new CompletableFuture<>();
+
+ public GetOperation(K key, Object ... args) {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(args);
+
+ this.key = key;
+ this.args = args;
+ }
+
+ /**
+ * @return The key of the operation.
+ */
+ @Override
+ public K getKey() {
+ Preconditions.checkNotNull(key);
+
+ return key;
+ }
+
+ /**
+ * @return null.
+ */
+ @Override
+ public V getValue() {
+ return null;
+ }
+
+ /**
+ * @return The extra arguments associated with the table.
+ */
+ @Override
+ public Object[] getArgs() {
+ return args;
+ }
+
+ CompletableFuture<V> getCompletableFuture() {
+ return completableFuture;
+ }
+
+ void complete(V val) {
+ completableFuture.complete(val);
+ }
+
+ void completeExceptionally(Throwable ex) {
+ completableFuture.completeExceptionally(ex);
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java b/samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java
new file mode 100644
index 0000000..48a75c5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Put operation.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value
+ */
+public class PutOperation<K, V> implements Operation<K, V> {
+ final private K key;
+ final private V val;
+ final private Object[] args;
+
+ public PutOperation(K key, V val, Object ... args) {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(args);
+
+ this.key = key;
+ this.val = val;
+ this.args = args;
+ }
+
+ /**
+ * @return The key to be put to the table.
+ */
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ /**
+ * @return The value to be put to the table.
+ */
+ @Override
+ public V getValue() {
+ return val;
+ }
+
+ /**
+ * @return The extra arguments associated with the table.
+ */
+ @Override
+ public Object[] getArgs() {
+ return args;
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/TableBatchHandler.java b/samza-core/src/main/java/org/apache/samza/table/batching/TableBatchHandler.java
new file mode 100644
index 0000000..12ec6f6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/TableBatchHandler.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import com.google.common.base.Preconditions;
+import java.util.stream.Collectors;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+
+
+/**
+ * Defines how table performs batch operations.
+ *
+ * @param <K> The key type of the operation.
+ * @param <V> The value type of the operation.
+ */
+public class TableBatchHandler<K, V> implements BatchHandler<K, V> {
+ private final AsyncReadWriteTable<K, V> table;
+
+ public TableBatchHandler(AsyncReadWriteTable<K, V> table) {
+ Preconditions.checkNotNull(table);
+
+ this.table = table;
+ }
+
+ /**
+ * Define how batch get should be done.
+ *
+ * @param operations The batch get operations.
+ * @return A CompletableFuture to represent the status of the batch operation.
+ */
+ private CompletableFuture<?> handleBatchGet(Collection<Operation<K, V>> operations) {
+ Preconditions.checkNotNull(operations);
+ final List<K> gets = getOperationKeys(operations);
+ if (gets.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+ }
+
+ final Object[] args = getOperationArgs(operations);
+ final CompletableFuture<Map<K, V>> getsFuture = args == null ?
+ table.getAllAsync(gets) : table.getAllAsync(gets, args);
+
+ getsFuture.whenComplete((map, throwable) -> {
+ operations.forEach(operation -> {
+ GetOperation<K, V> getOperation = (GetOperation<K, V>) operation;
+ if (throwable != null) {
+ getOperation.completeExceptionally(throwable);
+ } else {
+ getOperation.complete(map.get(operation.getKey()));
+ }
+ });
+ });
+ return getsFuture;
+ }
+
+ /**
+ * Define how batch put should be done.
+ *
+ * @param operations The batch get operations.
+ * @return A CompletableFuture to represent the status of the batch operation.
+ */
+ private CompletableFuture<?> handleBatchPut(Collection<Operation<K, V>> operations) {
+ Preconditions.checkNotNull(operations);
+
+ final List<Entry<K, V>> puts = operations.stream()
+ .map(op -> new Entry<>(op.getKey(), op.getValue()))
+ .collect(Collectors.toList());
+ if (puts.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+ }
+
+ final Object[] args = getOperationArgs(operations);
+ return args == null ? table.putAllAsync(puts) : table.putAllAsync(puts, args);
+ }
+
+ /**
+ * Define how batch delete should be done.
+ *
+ * @param operations The batch get operations.
+ * @return A CompletableFuture to represent the status of the batch operation.
+ */
+ private CompletableFuture<?> handleBatchDelete(Collection<Operation<K, V>> operations) {
+ Preconditions.checkNotNull(operations);
+
+ final List<K> deletes = getOperationKeys(operations);
+ if (deletes.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+ }
+ final Object[] args = getOperationArgs(operations);
+ return args == null ? table.deleteAllAsync(deletes) : table.deleteAllAsync(deletes, args);
+ }
+
+ private List<K> getOperationKeys(Collection<Operation<K, V>> operations) {
+ return operations.stream().map(op -> op.getKey()).collect(Collectors.toList());
+ }
+
+ private Object[] getOperationArgs(Collection<Operation<K, V>> operations) {
+ if (!operations.stream().anyMatch(operation -> operation.getArgs() != null && operation.getArgs().length > 0)) {
+ return null;
+ }
+ final List<Object[]> argsList = new ArrayList<>(operations.size());
+ operations.forEach(operation -> argsList.add(operation.getArgs()));
+ return argsList.toArray();
+ }
+
+ private List<Operation<K, V>> getQueryOperations(Batch<K, V> batch) {
+ return batch.getOperations().stream().filter(op -> op instanceof GetOperation)
+ .collect(Collectors.toList());
+ }
+
+ private List<Operation<K, V>> getPutOperations(Batch<K, V> batch) {
+ return batch.getOperations().stream().filter(op -> op instanceof PutOperation)
+ .collect(Collectors.toList());
+ }
+
+ private List<Operation<K, V>> getDeleteOperations(Batch<K, V> batch) {
+ return batch.getOperations().stream().filter(op -> op instanceof DeleteOperation)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Perform batch operations.
+ */
+ @Override
+ public CompletableFuture<Void> handle(Batch<K, V> batch) {
+ return CompletableFuture.allOf(
+ handleBatchPut(getPutOperations(batch)),
+ handleBatchDelete(getDeleteOperations(batch)),
+ handleBatchGet(getQueryOperations(batch)))
+ .whenComplete((val, throwable) -> {
+ if (throwable != null) {
+ batch.completeExceptionally(throwable);
+ } else {
+ batch.complete();
+ }
+ });
+ }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
index 23f4c01..85f612c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -34,6 +34,8 @@ import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.AsyncReadWriteTable;
import org.apache.samza.table.BaseReadWriteTable;
import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batching.BatchProvider;
+import org.apache.samza.table.batching.AsyncBatchingTable;
import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
import org.apache.samza.table.retry.AsyncRetriableTable;
import org.apache.samza.table.retry.TableRetryPolicy;
@@ -88,6 +90,10 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
protected final TableRetryPolicy readRetryPolicy;
protected final TableRetryPolicy writeRetryPolicy;
protected final ScheduledExecutorService retryExecutor;
+ // batch
+ protected final BatchProvider<K, V> batchProvider;
+ protected final ScheduledExecutorService batchExecutor;
+
// Other
protected final ExecutorService callbackExecutor;
@@ -105,6 +111,8 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
* @param readRetryPolicy read retry policy
* @param writeRetryPolicy write retry policy
* @param retryExecutor executor for invoking retries
+ * @param batchProvider batch provider to create a batch instance
+ * @param batchExecutor scheduled executor for batch
* @param callbackExecutor executor for invoking async callbacks
*/
public RemoteTable(
@@ -117,6 +125,8 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
TableRetryPolicy readRetryPolicy,
TableRetryPolicy writeRetryPolicy,
ScheduledExecutorService retryExecutor,
+ BatchProvider<K, V> batchProvider,
+ ScheduledExecutorService batchExecutor,
ExecutorService callbackExecutor) {
super(tableId);
@@ -131,6 +141,8 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
this.writeRetryPolicy = writeRetryPolicy;
this.callbackExecutor = callbackExecutor;
this.retryExecutor = retryExecutor;
+ this.batchProvider = batchProvider;
+ this.batchExecutor = batchExecutor;
AsyncReadWriteTable table = new AsyncRemoteTable(readFn, writeFn);
if (readRateLimiter != null || writeRateLimiter != null) {
@@ -139,6 +151,9 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
if (readRetryPolicy != null || writeRetryPolicy != null) {
table = new AsyncRetriableTable(tableId, table, readRetryPolicy, writeRetryPolicy, retryExecutor, readFn, writeFn);
}
+ if (batchProvider != null) {
+ table = new AsyncBatchingTable(tableId, table, batchProvider, batchExecutor);
+ }
asyncTable = table;
}
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index aca0a4b..8cd1c07 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batching.BatchProvider;
import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.BaseTableProvider;
@@ -51,6 +52,7 @@ public class RemoteTableProvider extends BaseTableProvider {
*/
private static Map<String, ExecutorService> rateLimitingExecutors = new ConcurrentHashMap<>();
private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+ private static Map<String, ScheduledExecutorService> batchExecutors = new ConcurrentHashMap<>();
private static ScheduledExecutorService retryExecutor;
public RemoteTableProvider(String tableId) {
@@ -116,10 +118,22 @@ public class RemoteTableProvider extends BaseTableProvider {
}));
}
+ BatchProvider batchProvider = deserializeObject(tableConfig, RemoteTableDescriptor.BATCH_PROVIDER);
+ if (batchProvider != null) {
+ batchExecutors.computeIfAbsent(tableId, (arg) ->
+ Executors.newSingleThreadScheduledExecutor(runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("table-" + tableId + "-batch-scheduled-executor");
+ thread.setDaemon(true);
+ return thread;
+ }));
+ }
+
+
RemoteTable table = new RemoteTable(tableId,
readFn, writeFn,
readRateLimiter, writeRateLimiter, rateLimitingExecutors.get(tableId),
- readRetryPolicy, writeRetryPolicy, retryExecutor,
+ readRetryPolicy, writeRetryPolicy, retryExecutor, batchProvider, batchExecutors.get(tableId),
callbackExecutors.get(tableId));
table.init(this.context);
tables.add(table);
@@ -134,6 +148,8 @@ public class RemoteTableProvider extends BaseTableProvider {
rateLimitingExecutors.clear();
callbackExecutors.values().forEach(e -> e.shutdown());
callbackExecutors.clear();
+ batchExecutors.values().forEach(e -> e.shutdown());
+ batchExecutors.clear();
}
private <T> T deserializeObject(JavaTableConfig tableConfig, String key) {
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
index 90d6fe8..bb8abed 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
@@ -20,6 +20,8 @@
package org.apache.samza.table.utils;
import com.google.common.base.Preconditions;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
@@ -29,6 +31,7 @@ import org.apache.samza.table.Table;
import org.apache.samza.table.caching.SupplierGauge;
import java.util.function.Supplier;
+import org.apache.samza.util.HighResolutionClock;
/**
@@ -103,4 +106,8 @@ public class TableMetricsUtil {
return String.format("%s-%s", tableId, name);
}
+ public static HighResolutionClock mayCreateHighResolutionClock(Config config) {
+ final MetricsConfig metricsConfig = new MetricsConfig(config);
+ return metricsConfig.getMetricsTimerEnabled() ? System::nanoTime : () -> 0;
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
new file mode 100644
index 0000000..dca7e75
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static java.lang.Thread.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ TestBatchProcessor.TestCreate.class,
+ TestBatchProcessor.TestUpdatesAndLookup.class,
+ TestBatchProcessor.TestBatchTriggered.class
+ })
+public class TestBatchProcessor {
+ private static final int SLOW_OPERATION_TIME_MS = 500;
+ private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
+ try {
+ sleep(SLOW_OPERATION_TIME_MS);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ return null;
+ };
+
+ public static class TestCreate {
+ @Test
+ public void testCreate() {
+ final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+ final BatchProcessor<Integer, Integer> batchProcessor = createBatchProcessor(table,
+ 3, Integer.MAX_VALUE);
+
+ // The batch processor initially has no operation.
+ Assert.assertEquals(0, batchProcessor.size());
+ batchProcessor.processUpdateOperation(new PutOperation<>(1, 1));
+ // The batch processor now has one operation.
+ Assert.assertEquals(1, batchProcessor.size());
+ }
+ }
+
+ public static class TestUpdatesAndLookup {
+ @Test
+ public void testUpdateAndLookup() {
+ final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+ final BatchProcessor<Integer, Integer> batchProcessor =
+ createBatchProcessor(table, Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+ int numberOfPuts = 10;
+ for (int i = 0; i < numberOfPuts; i++) {
+ batchProcessor.processUpdateOperation(new PutOperation<>(i, i));
+ }
+
+ // verify that the number of addBatch operations is correct.
+ Assert.assertEquals(numberOfPuts, batchProcessor.size());
+
+ // Verify that the value is correct for each key.
+ for (int i = 0; i < numberOfPuts; i++) {
+ final Operation<Integer, Integer> operation = batchProcessor.getLastUpdate(i);
+ Assert.assertEquals(i, operation.getKey().intValue());
+ Assert.assertEquals(i, operation.getValue().intValue());
+ }
+ }
+ }
+
+ public static class TestBatchTriggered {
+ @Test
+ public void testBatchOperationTriggeredByBatchSize() {
+ final int maxBatchSize = 3;
+
+ final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+ when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+
+ final BatchProcessor<Integer, Integer> batchProcessor =
+ createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
+
+ List<CompletableFuture<Void>> futureList = new ArrayList<>();
+ // One batch will be created and sent to the remote table after the for-loop.
+ for (int i = 0; i < maxBatchSize; i++) {
+ futureList.add(batchProcessor.processUpdateOperation(new PutOperation<>(i, i)));
+ }
+
+ for (int i = 0; i < maxBatchSize; i++) {
+ Assert.assertFalse(futureList.get(i).isDone());
+ }
+ Assert.assertEquals(0, batchProcessor.size());
+
+ try {
+ sleep(SLOW_OPERATION_TIME_MS * 2);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ for (int i = 0; i < maxBatchSize; i++) {
+ Assert.assertTrue(futureList.get(i).isDone());
+ }
+ }
+
+ @Test
+ public void testBatchOperationTriggeredByTimer() {
+ final int maxBatchDelayMs = 100;
+ final int putOperationCount = 100;
+
+ final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+ when(table.putAllAsync(any())).thenReturn(CompletableFuture.completedFuture(null));
+ when(table.deleteAllAsync(anyList())).thenReturn(CompletableFuture.completedFuture(null));
+
+ final BatchProcessor<Integer, Integer> batchProcessor =
+ createBatchProcessor(table, Integer.MAX_VALUE, maxBatchDelayMs);
+
+ for (int i = 0; i < putOperationCount; i++) {
+ batchProcessor.processUpdateOperation(new PutOperation<>(i, i));
+ }
+
+ // There's one batch with infinite maximum size, it has 100ms maximum delay.
+ Assert.assertEquals(putOperationCount, batchProcessor.size());
+
+ try {
+ sleep(maxBatchDelayMs * 2);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ // After the timer fired, a new batch will be created.
+ Assert.assertEquals(0, batchProcessor.size());
+ }
+ }
+
+ private static BatchProcessor<Integer, Integer> createBatchProcessor(ReadWriteTable<Integer, Integer> table,
+ int maxSize, int maxDelay) {
+ final BatchProvider<Integer, Integer> batchProvider = new CompactBatchProvider<Integer, Integer>()
+ .withMaxBatchDelay(Duration.ofMillis(maxDelay)).withMaxBatchSize(maxSize);
+ final BatchHandler<Integer, Integer> batchHandler = new TableBatchHandler<>(table);
+ final BatchMetrics batchMetrics = mock(BatchMetrics.class);
+ return new BatchProcessor<>(batchMetrics, batchHandler, batchProvider, () -> 0, Executors.newSingleThreadScheduledExecutor());
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchTable.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchTable.java
new file mode 100644
index 0000000..b1bd4a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchTable.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+public class TestBatchTable {
+ private static final int BATCH_SIZE = 5;
+ private static final Duration BATCH_DELAY = Duration.ofMillis(Integer.MAX_VALUE);
+
+ private AsyncBatchingTable<Integer, Integer> asyncBatchingTable;
+ private ReadWriteTable<Integer, Integer> table;
+ private Map<Integer, Integer> tableDb;
+
+ @Before
+ public void setup() {
+ final Answer getAnswer = invocation -> {
+ Integer key = invocation.getArgumentAt(0, Integer.class);
+ return tableDb.get(key);
+ };
+
+ final Answer getAsyncAnswer = invocation -> {
+ Integer key = invocation.getArgumentAt(0, Integer.class);
+ return CompletableFuture.completedFuture(tableDb.get(key));
+ };
+
+ final Answer getAllAsyncAnswer = invocation -> {
+ final List<Integer> list = invocation.getArgumentAt(0, List.class);
+ final Map<Integer, Integer> map = new HashMap<>();
+ list.forEach(k -> map.put(k, tableDb.get(k)));
+ return CompletableFuture.completedFuture(map);
+ };
+
+ final Answer putAnswer = invocation -> {
+ Integer key = invocation.getArgumentAt(0, Integer.class);
+ Integer value = invocation.getArgumentAt(1, Integer.class);
+ tableDb.put(key, value);
+ return null;
+ };
+
+ final Answer putAsyncAnswer = invocation -> {
+ final Integer key = invocation.getArgumentAt(0, Integer.class);
+ final Integer value = invocation.getArgumentAt(1, Integer.class);
+ tableDb.put(key, value);
+ return CompletableFuture.completedFuture(null);
+ };
+
+ final Answer putAllAsyncAnswer = invocation -> {
+ final List<Entry<Integer, Integer>> list = invocation.getArgumentAt(0, List.class);
+ list.forEach(entry -> tableDb.put(entry.getKey(), entry.getValue()));
+ return CompletableFuture.completedFuture(null);
+ };
+
+ final Answer deleteAnswer = invocation -> {
+ final Integer key = invocation.getArgumentAt(0, Integer.class);
+ tableDb.remove(key);
+ return null;
+ };
+
+ final Answer deleteAsyncAnswer = invocation -> {
+ final Integer key = invocation.getArgumentAt(0, Integer.class);
+ tableDb.remove(key);
+ return CompletableFuture.completedFuture(null);
+ };
+
+ final Answer deleteAllAsyncAnswer = invocation -> {
+ final List<Integer> list = invocation.getArgumentAt(0, List.class);
+ list.forEach(k -> tableDb.remove(k));
+ return CompletableFuture.completedFuture(null);
+ };
+
+ table = mock(ReadWriteTable.class);
+ final BatchMetrics batchMetrics = mock(BatchMetrics.class);
+ tableDb = new HashMap<>();
+ asyncBatchingTable = new AsyncBatchingTable("id", table, new CompactBatchProvider()
+ .withMaxBatchSize(BATCH_SIZE)
+ .withMaxBatchDelay(BATCH_DELAY), Executors.newSingleThreadScheduledExecutor());
+ asyncBatchingTable.createBatchProcessor(() -> 0, mock(BatchMetrics.class));
+
+ doAnswer(putAnswer).when(table).put(anyInt(), anyInt());
+ doAnswer(putAsyncAnswer).when(table).putAsync(anyInt(), anyInt());
+ doAnswer(putAllAsyncAnswer).when(table).putAllAsync(anyList());
+
+ doAnswer(deleteAnswer).when(table).delete(anyInt());
+ doAnswer(deleteAsyncAnswer).when(table).deleteAsync(anyInt());
+ doAnswer(deleteAllAsyncAnswer).when(table).deleteAllAsync(anyList());
+
+ doAnswer(getAnswer).when(table).get(anyInt());
+ doAnswer(getAsyncAnswer).when(table).getAsync(anyInt());
+ doAnswer(getAllAsyncAnswer).when(table).getAllAsync(anyList());
+ }
+
+ @After
+ public void tearDown() {
+ asyncBatchingTable.close();
+ }
+
+ @Test
+ public void testPutAsync() {
+ final List<CompletableFuture<Void>> futures = new LinkedList<>();
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ futures.add(asyncBatchingTable.putAsync(i, i));
+ }
+ sleep();
+
+ final BatchProcessor<Integer, Integer> batchProcessor = asyncBatchingTable.getBatchProcessor();
+
+ // Verify that all async puts are finished.
+ futures.forEach(future -> Assert.assertTrue(future.isDone()));
+ verify(table, times(1)).putAllAsync(any());
+
+ // There should be no operations in the batch processor.
+ Assert.assertEquals(0, batchProcessor.size());
+
+ asyncBatchingTable.putAsync(BATCH_SIZE, BATCH_SIZE);
+
+ // Now batch size should be 1.
+ Assert.assertEquals(1, batchProcessor.size());
+ }
+
+ @Test
+ public void testPutAllAsync() {
+ final List<Entry<Integer, Integer>> entries = new LinkedList<>();
+
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(new Entry<>(i, i));
+ }
+
+ CompletableFuture<Void> future = asyncBatchingTable.putAllAsync(entries);
+ final BatchProcessor<Integer, Integer> batchProcessor = asyncBatchingTable.getBatchProcessor();
+
+ sleep();
+
+ // Verify that putAllAsync is finished.
+ Assert.assertTrue(future.isDone());
+
+ // There should be no pending operations.
+ Assert.assertEquals(0, batchProcessor.size());
+
+ // The addBatchUpdates batch operations propagates to the table.
+ verify(table, times(1)).putAllAsync(anyList());
+
+ // This new addBatchUpdates will make the batch size to be 1.
+ asyncBatchingTable.putAsync(BATCH_SIZE, BATCH_SIZE);
+
+ Assert.assertEquals(1, batchProcessor.size());
+ }
+
+ @Test
+ public void testGetAsync() throws ExecutionException, InterruptedException {
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ asyncBatchingTable.putAsync(i, i);
+ }
+ sleep();
+
+ final List<CompletableFuture<Integer>> futures = new ArrayList<>(BATCH_SIZE);
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ futures.add(asyncBatchingTable.getAsync(i));
+ }
+ sleep();
+
+ for (Integer i = 0; i < BATCH_SIZE; i++) {
+ Assert.assertTrue(futures.get(i).isDone());
+ Assert.assertEquals(i, futures.get(i).get());
+ }
+ verify(table, times(1)).getAllAsync(anyList());
+ }
+
+ @Test
+ public void testGetAllAsync() throws ExecutionException, InterruptedException {
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ asyncBatchingTable.putAsync(i, i);
+ }
+ sleep();
+
+ final List<Integer> keys = new LinkedList<>();
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ keys.add(new Integer(i));
+ }
+
+ CompletableFuture<Map<Integer, Integer>> future = asyncBatchingTable.getAllAsync(keys);
+ sleep();
+
+ Assert.assertTrue(future.isDone());
+ Assert.assertEquals(BATCH_SIZE, future.get().size());
+
+ verify(table, times(1)).getAllAsync(anyList());
+ }
+
+ @Test
+ public void testDeleteAsync() throws Exception {
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ asyncBatchingTable.putAsync(i, i);
+ }
+ sleep();
+
+ // The 1st batch is done.
+ verify(table, times(1)).putAllAsync(anyList());
+
+ final List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ completableFutures.add(asyncBatchingTable.deleteAsync(i));
+ }
+ sleep();
+
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ Assert.assertEquals(null, completableFutures.get(i).get());
+ }
+ }
+
+ @Test
+ public void testDeleteAllAsync() throws Exception {
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ asyncBatchingTable.putAsync(i, i);
+ }
+ sleep();
+
+ final List<Integer> keys = new LinkedList<>();
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ keys.add(new Integer(i));
+ }
+
+ final CompletableFuture<Void> future = asyncBatchingTable.deleteAllAsync(keys);
+ sleep();
+ Assert.assertTrue(future.isDone());
+
+ final CompletableFuture<Map<Integer, Integer>> getAllFuture = asyncBatchingTable.getAllAsync(keys);
+ sleep();
+
+ Assert.assertTrue(getAllFuture.isDone());
+ getAllFuture.get().forEach((k, v) -> Assert.assertEquals(null, v));
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 44efc5b..d873c16 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -286,6 +286,7 @@ public class TestCachingTable {
tableId + "-remote", readFn, writeFn,
rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
null, null, null,
+ null, null,
Executors.newSingleThreadExecutor());
final CachingTable<String, String> cachingTable = new CachingTable<>(
@@ -407,6 +408,7 @@ public class TestCachingTable {
tableId, readFn, writeFn,
rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
null, null, null,
+ null, null,
Executors.newSingleThreadExecutor());
final CachingTable<String, String> cachingTable = new CachingTable<>(
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 02625bb..7a98504 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -86,7 +86,7 @@ public class TestRemoteTable {
RemoteTable<K, V> table = new RemoteTable(tableId, readFn, writeFn,
readRateLimiter, writeRateLimiter, rateLimitingExecutor,
- readPolicy, writePolicy, retryExecutor, cbExecutor);
+ readPolicy, writePolicy, retryExecutor, null, null, cbExecutor);
table.init(getMockContext());
verify(readFn, times(1)).init(any(), any());
if (writeFn != null) {
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
index 0a3b13e..a4561ee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
@@ -474,7 +474,7 @@ public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
RemoteTable<String, String> table = new RemoteTable<>("table1", reader, null,
rateLimitHelper, null, Executors.newSingleThreadExecutor(),
null, null, null,
- null);
+ null, null, null);
table.init(createMockContext());
table.get("abc");
}
@@ -490,7 +490,7 @@ public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
RemoteTable<String, String> table = new RemoteTable<String, String>("table1", reader, writer,
rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
null, null, null,
- null);
+ null, null, null);
table.init(createMockContext());
table.put("abc", "efg");
}
@@ -502,7 +502,7 @@ public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
RemoteTable<String, String> table = new RemoteTable<String, String>("table1", reader, null,
rateLimitHelper, null, Executors.newSingleThreadExecutor(),
null, null, null,
- null);
+ null, null, null);
table.init(createMockContext());
try {
table.put("abc", "efg");
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
new file mode 100644
index 0000000..3cad14e
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.BaseTableFunction;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestRemoteTableWithBatchEndToEnd extends IntegrationTestHarness {
+
+ static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
+ static Map<String, AtomicInteger> batchWrites = new HashMap<>();
+ static Map<String, AtomicInteger> batchReads = new HashMap<>();
+
+ static class InMemoryReadFunction extends BaseTableFunction
+ implements TableReadFunction<Integer, Profile> {
+ private final String serializedProfiles;
+ private final String testName;
+ private transient Map<Integer, Profile> profileMap;
+ private transient AtomicInteger batchReadCounter;
+
+ private InMemoryReadFunction(String testName, String profiles) {
+ this.testName = testName;
+ this.serializedProfiles = profiles;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
+ this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
+ batchReadCounter = batchReads.get(testName);
+ }
+
+ @Override
+ public CompletableFuture<Profile> getAsync(Integer key) {
+ return CompletableFuture.completedFuture(profileMap.get(key));
+ }
+
+ @Override
+ public CompletableFuture<Map<Integer, Profile>> getAllAsync(Collection<Integer> keys) {
+ batchReadCounter.incrementAndGet();
+ return TableReadFunction.super.getAllAsync(keys);
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+
+ static InMemoryReadFunction getInMemoryReadFunction(String testName, String serializedProfiles) {
+ return new InMemoryReadFunction(testName, serializedProfiles);
+ }
+ }
+
+ static class InMemoryWriteFunction extends BaseTableFunction
+ implements TableWriteFunction<Integer, EnrichedPageView> {
+ private transient List<EnrichedPageView> records;
+ private transient AtomicInteger batchWritesCounter;
+ private final String testName;
+
+ public InMemoryWriteFunction(String testName) {
+ this.testName = testName;
+ }
+
+ // Verify serializable functionality
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ // Write to the global list for verification
+ records = writtenRecords.get(testName);
+ batchWritesCounter = batchWrites.get(testName);
+ }
+
+ @Override
+ public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
+ records.add(record);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteAsync(Integer key) {
+ records.remove(key);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> putAllAsync(Collection<Entry<Integer, EnrichedPageView>> records) {
+ batchWritesCounter.incrementAndGet();
+ return TableWriteFunction.super.putAllAsync(records);
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+
+ static class MyReadFunction extends BaseTableFunction
+ implements TableReadFunction {
+ @Override
+ public CompletableFuture getAsync(Object key) {
+ return null;
+ }
+
+ @Override
+ public boolean isRetriable(Throwable exception) {
+ return false;
+ }
+ }
+
+ private void doTestStreamTableJoinRemoteTable(String testName, boolean batchRead, boolean batchWrite) throws Exception {
+ final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
+
+ batchReads.put(testName, new AtomicInteger());
+ batchWrites.put(testName, new AtomicInteger());
+ writtenRecords.put(testName, new CopyOnWriteArrayList<>());
+
+ final int count = 16;
+ final int batchSize = 4;
+ PageView[] pageViews = generatePageViewsWithDistinctKeys(count);
+ String profiles = Base64Serializer.serialize(generateProfiles(count));
+
+ int partitionCount = 1;
+ Map<String, String> configs = TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+ configs.put("streams.PageView.samza.system", "test");
+ configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+ configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+ configs.put("task.max.concurrency", String.valueOf(count));
+ configs.put("task.async.commit", String.valueOf(true));
+
+ final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+ final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+ final TableRateLimiter.CreditFunction creditFunction = (k, v, args)->1;
+ final StreamApplication app = appDesc -> {
+ RemoteTableDescriptor<Integer, Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
+ inputTableDesc
+ .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(testName, profiles))
+ .withRateLimiter(readRateLimiter, creditFunction, null);
+ if (batchRead) {
+ inputTableDesc.withBatchProvider(new CompactBatchProvider().withMaxBatchSize(batchSize).withMaxBatchDelay(Duration.ofHours(1)));
+ }
+
+ // dummy reader
+ TableReadFunction readFn = new MyReadFunction();
+
+ RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+ outputTableDesc
+ .withReadFunction(readFn)
+ .withWriteFunction(writer)
+ .withRateLimiter(writeRateLimiter, creditFunction, creditFunction);
+ if (batchWrite) {
+ outputTableDesc.withBatchProvider(new CompactBatchProvider().withMaxBatchSize(batchSize).withMaxBatchDelay(Duration.ofHours(1)));
+ }
+
+ Table<KV<Integer, EnrichedPageView>> outputTable = appDesc.getTable(outputTableDesc);
+
+ Table<KV<Integer, Profile>> inputTable = appDesc.getTable(inputTableDesc);
+
+ DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+ GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+ appDesc.getInputStream(isd)
+ .map(pv -> new KV<>(pv.getMemberId(), pv))
+ .join(inputTable, new PageViewToProfileJoinFunction())
+ .map(m -> new KV(m.getMemberId(), m))
+ .sendTo(outputTable);
+ };
+
+ Config config = new MapConfig(configs);
+ final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+ executeRun(runner, config);
+
+ runner.waitForFinish();
+ int numExpected = count * partitionCount;
+ Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
+ Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
+
+ if (batchRead) {
+ Assert.assertEquals(numExpected / batchSize, batchReads.get(testName).get());
+ }
+ if (batchWrite) {
+ Assert.assertEquals(numExpected / batchSize, batchWrites.get(testName).get());
+ }
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTableBatchingReadWrite() throws Exception {
+ doTestStreamTableJoinRemoteTable("testStreamTableJoinRemoteTableBatchingReadWrite", true, true);
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTableBatchingRead() throws Exception {
+ doTestStreamTableJoinRemoteTable("testStreamTableJoinRemoteTableBatchingRead", true, false);
+ }
+
+ @Test
+ public void testStreamTableJoinRemoteTableBatchingWrite() throws Exception {
+ doTestStreamTableJoinRemoteTable("testStreamTableJoinRemoteTableBatchingWrite", false, true);
+ }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index ed73961..21e57a8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -205,6 +205,16 @@ public class TestTableData {
return pageviews;
}
+ static public PageView[] generatePageViewsWithDistinctKeys(int count) {
+ Random random = new Random();
+ PageView[] pageviews = new PageView[count];
+ for (int i = 0; i < count; i++) {
+ String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+ pageviews[i] = new PageView(pagekey, i);
+ }
+ return pageviews;
+ }
+
private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
static public Profile[] generateProfiles(int count) {