You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/05/22 21:59:17 UTC

[samza] branch master updated: SAMZA-2191: Batch support for table APIs (#1031)

This is an automated email from the ASF dual-hosted git repository.

weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 933ce10  SAMZA-2191: Batch support for table APIs (#1031)
933ce10 is described below

commit 933ce10d8c929a729ac2fec23c4337f99440686f
Author: dengpanyin <34...@users.noreply.github.com>
AuthorDate: Wed May 22 14:59:12 2019 -0700

    SAMZA-2191: Batch support for table APIs (#1031)
    
    SAMZA-2191: Batching support for table APIs
    
    Add a CompactBatch class to allow users to compact and batch
    the operations sent to the remote store via table APIs.
---
 .../learn/documentation/versioned/api/table-api.md |  30 +++
 .../org/apache/samza/table/batching/Batch.java     |  90 +++++++
 .../apache/samza/table/batching/BatchProvider.java |  50 ++++
 .../org/apache/samza/table/batching/Operation.java |  43 ++++
 .../table/descriptors/RemoteTableDescriptor.java   |  15 +-
 .../apache/samza/table/batching/AbstractBatch.java |  76 ++++++
 .../samza/table/batching/AsyncBatchingTable.java   | 170 +++++++++++++
 .../apache/samza/table/batching/BatchHandler.java  |  38 +++
 .../apache/samza/table/batching/BatchMetrics.java  |  53 ++++
 .../samza/table/batching/BatchProcessor.java       | 194 +++++++++++++++
 .../batching/BatchingNotSupportedException.java    |  42 ++++
 .../apache/samza/table/batching/CompactBatch.java  |  84 +++++++
 .../samza/table/batching/CompactBatchProvider.java |  27 ++
 .../apache/samza/table/batching/CompleteBatch.java |  68 +++++
 .../table/batching/CompleteBatchProvider.java      |  27 ++
 .../samza/table/batching/DeleteOperation.java      |  64 +++++
 .../apache/samza/table/batching/GetOperation.java  |  81 ++++++
 .../apache/samza/table/batching/PutOperation.java  |  68 +++++
 .../samza/table/batching/TableBatchHandler.java    | 161 ++++++++++++
 .../org/apache/samza/table/remote/RemoteTable.java |  15 ++
 .../samza/table/remote/RemoteTableProvider.java    |  18 +-
 .../apache/samza/table/utils/TableMetricsUtil.java |   7 +
 .../samza/table/batching/TestBatchProcessor.java   | 164 ++++++++++++
 .../samza/table/batching/TestBatchTable.java       | 275 +++++++++++++++++++++
 .../samza/table/caching/TestCachingTable.java      |   2 +
 .../apache/samza/table/remote/TestRemoteTable.java |   2 +-
 .../samza/test/table/TestRemoteTableEndToEnd.java  |   6 +-
 .../table/TestRemoteTableWithBatchEndToEnd.java    | 253 +++++++++++++++++++
 .../org/apache/samza/test/table/TestTableData.java |  10 +
 29 files changed, 2127 insertions(+), 6 deletions(-)

diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md
index d8695ba..106d27a 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -245,6 +245,8 @@ The table below summarizes table metrics:
 
 | Metrics | Class | Description |
 |---------|-------|-------------|
+|`num-batches`|`AsyncBatchingTable`|Number of batch operations|
+|`batch-ns`|`AsyncBatchingTable`|Time interval between opening and closing a batch|
 |`get-ns`|`ReadableTable`|Average latency of `get/getAsync()` operations|
 |`getAll-ns`|`ReadableTable`|Average latency of `getAll/getAllAsync()` operations|
 |`num-gets`|`ReadableTable`|Count of `get/getAsync()` operations
@@ -301,6 +303,29 @@ Couchbase is supported as remote table. See
 [`CouchbaseTableReadFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableReadFunction.java) and 
 [`CouchbaseTableWriteFunction`](https://github.com/apache/samza/blob/master/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java).
 
+### Batching
+
+Remote Table has built-in client-side batching support for its async executions.
+This is useful when a remote data store supports batch processing and is not sophisticated enough
+to handle heavy inbound requests. 
+
+#### Configuration
+
+Batching can be enabled with [`RemoteTableDescriptor`](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java)
+by providing a [`BatchProvider`](https://github.com/apache/samza/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java)
+The user can choose:
+ 
+1. A [`CompactBatchProvider`](https://github.com/apache/samza/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java) which provides a batch such that
+   the operations are compacted by the key. For update operations, the latter update will override the value of the previous one when they have the same key. For query operations,
+   the operations will be combined as a single operation when they have the same key. 
+2. A [`CompleteBatchProvider`](https://github.com/apache/samza/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java) which provides a batch such that
+   all the operations will be visible to the remote store regardless of the keys.
+3. A user-defined instance of [`BatchProvider`].
+
+For each [`BatchProvider`], the user can config the following:
+1. Specify the max size the batch can grow before being closed by `withmaxBatchSize(int)`
+2. Specify the max time the batch can last before being closed by `withmaxBatchDelay(Duration)`
+
 ### Rate Limiting
 
 Remote Table has built-in client-side rate limiting support in both of its sync 
@@ -608,6 +633,11 @@ We recommend:
    retries at a higher and more abstract Remote Table level, which implies 
    retrying is not a responsibility of I/O functions.
 
+### Batching
+
+Samza Remote Table API can be configured to utilize user-supplied bath providers. 
+You may refer to the [Batching](#batching) section under Remote Table for more details.
+
 ### Caching
 
 Samza Remote Table API can be configured to utilize user-supplied caches. 
diff --git a/samza-api/src/main/java/org/apache/samza/table/batching/Batch.java b/samza-api/src/main/java/org/apache/samza/table/batching/Batch.java
new file mode 100644
index 0000000..e4e0655
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/batching/Batch.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manages a sequence of {@link Operation}s, which will be performed as a batch.
+ * A batch can be configured with a {@code maxBatchSize} and/or {@code maxBatchDelay}.
+ * When the number of operations in the batch exceeds the {@code maxBatchSize}
+ * or the time window exceeds the {@code maxBatchDelay}, the batch will be performed.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+public interface Batch<K, V> {
+  /**
+   * Add an operation to the batch.
+   *
+   * @param operation The operation to be added.
+   * @return A {@link CompletableFuture} that indicate the status of the batch.
+   */
+  CompletableFuture<Void> addOperation(Operation<K, V> operation);
+
+  /**
+   * Close the bach so that it will not accept more operations.
+   */
+  void close();
+
+  /**
+   * @return Whether the bach can accept more operations.
+   */
+  boolean isClosed();
+
+  /**
+   * @return The operations buffered by the batch.
+   */
+  Collection<Operation<K, V>> getOperations();
+
+  /**
+   * @return The batch max delay.
+   */
+  Duration getMaxBatchDelay();
+
+  /**
+   * @return The batch capacity
+   */
+  int getMaxBatchSize();
+
+  /**
+   * Change the batch status to be complete.
+   */
+  void complete();
+
+  /**
+   * Change the batch status to be complete with exception.
+   */
+  void completeExceptionally(Throwable throwable);
+
+  /**
+   * @return The number of operations in the batch.
+   */
+  int size();
+
+  /**
+   * @return True if the batch is empty.
+   */
+  default boolean isEmpty() {
+    return size() == 0;
+  }
+}
\ No newline at end of file
diff --git a/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java b/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java
new file mode 100644
index 0000000..ae3ad60
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/batching/BatchProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.io.Serializable;
+import java.time.Duration;
+import org.apache.samza.table.remote.TablePart;
+
+
+public abstract class BatchProvider<K, V> implements TablePart, Serializable {
+  public abstract Batch<K, V> getBatch();
+
+  private int maxBatchSize = 100;
+  private Duration maxBatchDelay = Duration.ofMillis(100);
+
+  public BatchProvider<K, V> withMaxBatchSize(int maxBatchSize) {
+    this.maxBatchSize = maxBatchSize;
+    return this;
+  }
+
+  public BatchProvider<K, V> withMaxBatchDelay(Duration maxBatchDelay) {
+    this.maxBatchDelay = maxBatchDelay;
+    return this;
+  }
+
+  public int getMaxBatchSize() {
+    return maxBatchSize;
+  }
+
+  public Duration getMaxBatchDelay() {
+    return maxBatchDelay;
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/batching/Operation.java b/samza-api/src/main/java/org/apache/samza/table/batching/Operation.java
new file mode 100644
index 0000000..d7acd9a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/batching/Operation.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+/**
+ * Interface for operations that can be batched.
+ *
+ * @param <K> The key type associated with the operation.
+ * @param <V> The value type associated with the operation.
+ */
+public interface Operation<K, V> {
+  /**
+   * @return The key associated with the operation.
+   */
+  K getKey();
+
+  /**
+   * @return The value associated with the operation.
+   */
+  V getValue();
+
+  /**
+   * @return The extra arguments associated with the operation.
+   */
+  Object[] getArgs();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
index 3c7b62b..b0590c5 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.table.batching.BatchProvider;
 import org.apache.samza.table.remote.TablePart;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
@@ -74,6 +75,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
   public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
   public static final String READ_RETRY_POLICY = "io.read.retry.policy";
   public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
+  public static final String BATCH_PROVIDER = "io.batch.provider";
 
   // Input support for a specific remote store (required)
   private TableReadFunction<K, V> readFn;
@@ -84,12 +86,14 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
   // Rate limiter for client-side throttling; it is set by withRateLimiter()
   private RateLimiter rateLimiter;
 
-  // Indicate whether read rate limiter is enabled or not
   private boolean enableReadRateLimiter = true;
 
   // Indicate whether write rate limiter is enabled or not
   private boolean enableWriteRateLimiter = true;
 
+  // Batching support to reduce traffic volume sent to the remote store.
+  private BatchProvider<K, V> batchProvider;
+
   // Rates for constructing the default rate limiter when they are non-zero
   private Map<String, Integer> tagCreditsMap = new HashMap<>();
 
@@ -259,6 +263,11 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
     return this;
   }
 
+  public RemoteTableDescriptor<K, V> withBatchProvider(BatchProvider<K, V> batchProvider) {
+    this.batchProvider = batchProvider;
+    return this;
+  }
+
   @Override
   public String getProviderFactoryClassName() {
     return PROVIDER_FACTORY_CLASS_NAME;
@@ -327,6 +336,10 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
       addTablePartConfig(WRITE_FN, writeFn, jobConfig, tableConfig);
     }
 
+    if (batchProvider != null) {
+      addTableConfig(BATCH_PROVIDER, SerdeUtils.serialize("batch provider", batchProvider), tableConfig);
+      addTablePartConfig(BATCH_PROVIDER, batchProvider, jobConfig, tableConfig);
+    }
     return Collections.unmodifiableMap(tableConfig);
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/AbstractBatch.java b/samza-core/src/main/java/org/apache/samza/table/batching/AbstractBatch.java
new file mode 100644
index 0000000..5f65f77
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/AbstractBatch.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+
+abstract class AbstractBatch<K, V> implements Batch<K, V> {
+  protected final int maxBatchSize;
+  protected final Duration maxBatchDelay;
+  protected final CompletableFuture<Void> completableFuture;
+  protected boolean closed = false;
+  protected static final BatchingNotSupportedException BATCH_NOT_SUPPORTED_EXCEPTION = new BatchingNotSupportedException();
+
+  AbstractBatch(int maxBatchSize, Duration maxBatchDelay) {
+    // The max number of {@link Operation}s that the batch can hold.
+    this.maxBatchSize = maxBatchSize;
+    // The max time that the batch can last before being performed.
+    this.maxBatchDelay = maxBatchDelay;
+    completableFuture = new CompletableFuture<>();
+  }
+
+  /**
+   * @return The batch capacity.
+   */
+  @Override
+  public int getMaxBatchSize() {
+    return maxBatchSize;
+  }
+
+  /**
+   * @return The batch max delay.
+   */
+  @Override
+  public Duration getMaxBatchDelay() {
+    return maxBatchDelay;
+  }
+
+  @Override
+  public void complete() {
+    completableFuture.complete(null);
+  }
+
+  @Override
+  public void completeExceptionally(Throwable throwable) {
+    completableFuture.completeExceptionally(throwable);
+  }
+
+  @Override
+  public void close() {
+    closed = true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java b/samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java
new file mode 100644
index 0000000..f432ad0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/AsyncBatchingTable.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.samza.SamzaException;
+import org.apache.samza.context.Context;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+import org.apache.samza.table.utils.TableMetricsUtil;
+import org.apache.samza.util.HighResolutionClock;
+
+
+/**
+ * A wrapper of a {@link AsyncReadWriteTable} that supports batch operations.
+ *
+ * This batching table does not guarantee any ordering of different operation types within the batch.
+ * For instance, query(Q) and update(u) operations arrives in the following sequences, Q1, U1, Q2, U2,
+ * it does not mean the remote data store will receive the messages in the same order. Instead,
+ * the operations will be grouped by type and sent via micro batches. For this sequence, Q1 and Q2 will
+ * be grouped to micro batch B1; U1 and U2 will be grouped to micro batch B2, the implementation class
+ * can decide the order of the micro batches.
+ *
+ * Synchronized table operations (get/put/delete) should be used with caution for the batching feature.
+ * If the table is used by a single thread, there will be at most one operation in the batch, and the
+ * batch will be performed when the TTL of the batch window expires. Batching does not make sense in this scenario.
+ *
+ * The Batch implementation class can throw {@link BatchingNotSupportedException} if it thinks the operation is
+ * not batch-able. When receiving this exception, {@link AsyncBatchingTable} will send the operation to the
+ * {@link AsyncReadWriteTable}.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value.
+ */
+public class AsyncBatchingTable<K, V> implements AsyncReadWriteTable<K, V> {
+  private final AsyncReadWriteTable<K, V> table;
+  private final String tableId;
+  private final BatchProvider<K, V> batchProvider;
+  private final ScheduledExecutorService batchTimerExecutorService;
+  private BatchProcessor<K, V> batchProcessor;
+
+  /**
+   * @param tableId The id of the table.
+   * @param table The target table that serves the batch operations.
+   * @param batchProvider Batch provider to create a batch instance.
+   */
+  public AsyncBatchingTable(String tableId, AsyncReadWriteTable table, BatchProvider<K, V> batchProvider,
+      ScheduledExecutorService batchTimerExecutorService) {
+    Preconditions.checkNotNull(tableId);
+    Preconditions.checkNotNull(table);
+    Preconditions.checkNotNull(batchProvider);
+    Preconditions.checkNotNull(batchTimerExecutorService);
+
+    this.tableId = tableId;
+    this.table = table;
+    this.batchProvider = batchProvider;
+    this.batchTimerExecutorService = batchTimerExecutorService;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key, Object... args) {
+    try {
+      return batchProcessor.processQueryOperation(new GetOperation<>(key, args));
+    } catch (BatchingNotSupportedException e) {
+      return table.getAsync(key, args);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object... args) {
+    return table.getAllAsync(keys);
+  }
+
+  @Override
+  public <T> CompletableFuture<T> readAsync(int opId, Object ... args) {
+    return table.readAsync(opId, args);
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value, Object... args) {
+    try {
+      return batchProcessor.processUpdateOperation(new PutOperation<>(key, value, args));
+    } catch (BatchingNotSupportedException e) {
+      return table.putAsync(key, value, args);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries, Object... args) {
+    return table.putAllAsync(entries);
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key, Object... args) {
+    try {
+      return batchProcessor.processUpdateOperation(new DeleteOperation<>(key, args));
+    } catch (BatchingNotSupportedException e) {
+      return table.deleteAsync(key, args);
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys, Object... args) {
+    return table.deleteAllAsync(keys);
+  }
+
+  @Override
+  public void init(Context context) {
+    table.init(context);
+    final TableMetricsUtil metricsUtil = new TableMetricsUtil(context, this, tableId);
+
+    createBatchProcessor(TableMetricsUtil.mayCreateHighResolutionClock(context.getJobContext().getConfig()),
+        new BatchMetrics(metricsUtil));
+  }
+
+  @Override
+  public <T> CompletableFuture<T> writeAsync(int opId, Object ... args) {
+    return table.writeAsync(opId, args);
+  }
+
+  @Override
+  public void flush() {
+    table.flush();
+  }
+
+  @Override
+  public void close() {
+    batchProcessor.stop();
+    table.close();
+  }
+
+  @VisibleForTesting
+  void createBatchProcessor(HighResolutionClock clock, BatchMetrics batchMetrics) {
+    batchProcessor = new BatchProcessor<>(batchMetrics, new TableBatchHandler<>(table),
+        batchProvider, clock, batchTimerExecutorService);
+  }
+
+  @VisibleForTesting
+  BatchProcessor<K, V> getBatchProcessor() {
+    return batchProcessor;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java
new file mode 100644
index 0000000..81256ef
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Define how the batch operations will be handled.
+ *
+ * @param <K> The key type of the operations
+ * @param <V> The value type of the operations.
+ */
+public interface BatchHandler<K, V> {
+  /**
+   *
+   * @param batch The batch to be handled
+   * @return A {@link CompletableFuture} that indicates the status of the handle process.
+   */
+  CompletableFuture<Void> handle(Batch<K, V> batch);
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchMetrics.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchMetrics.java
new file mode 100644
index 0000000..f160545
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchMetrics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.utils.TableMetricsUtil;
+
+
+/**
+ * Wrapper of batch-related metrics.
+ */
+class BatchMetrics {
+  /**
+   * The number of batches
+   */
+  final Counter batchCount;
+
+  /**
+   * The time duration between opening and closing the batch
+   */
+  final Timer batchDuration;
+
+  public BatchMetrics(TableMetricsUtil metricsUtil) {
+    batchCount = metricsUtil.newCounter("num-batches");
+    batchDuration = metricsUtil.newTimer("batch-ns");
+  }
+
+  public void incBatchCount() {
+    batchCount.inc();
+  }
+
+  public void updateBatchDuration(long d) {
+    batchDuration.update(d);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
new file mode 100644
index 0000000..bde6d48
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.samza.util.HighResolutionClock;
+
+
+/**
+ * Place a sequence of operations into a {@link Batch}. When a batch is not allowed to accept more
+ * operations, it will handled by a {@link BatchHandler}. Meanwhile, a new {@link Batch} will be
+ * created and a timer will be set for it.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+public class BatchProcessor<K, V> {
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final ReentrantLock lock = new ReentrantLock();
+  private final BatchHandler<K, V> batchHandler;
+  private final BatchProvider<K, V> batchProvider;
+  private final BatchMetrics batchMetrics;
+  private final HighResolutionClock clock;
+  private Batch<K, V> batch;
+  private ScheduledFuture<?> scheduledFuture;
+  private long batchOpenTimestamp;
+
+  /**
+   * @param batchHandler Defines how each batch will be processed.
+   * @param batchProvider The batch provider to create a batch instance.
+   * @param clock A clock used to get the timestamp.
+   * @param scheduledExecutorService A scheduled executor service to set timers for the managed batches.
+   */
+  public BatchProcessor(BatchMetrics batchMetrics, BatchHandler<K, V> batchHandler, BatchProvider batchProvider,
+      HighResolutionClock clock, ScheduledExecutorService scheduledExecutorService) {
+    Preconditions.checkNotNull(batchHandler);
+    Preconditions.checkNotNull(batchProvider);
+    Preconditions.checkNotNull(clock);
+    Preconditions.checkNotNull(scheduledExecutorService);
+
+    this.batchHandler = batchHandler;
+    this.batchProvider = batchProvider;
+    this.scheduledExecutorService = scheduledExecutorService;
+    this.batchMetrics = batchMetrics;
+    this.clock = clock;
+  }
+
+  private CompletableFuture<Void> addOperation(Operation<K, V> operation) {
+    if (batch == null) {
+      startNewBatch();
+    }
+    final CompletableFuture<Void> res = batch.addOperation(operation);
+    if (batch.isClosed()) {
+      processBatch(true);
+    }
+    return res;
+  }
+
+  /**
+   * @param operation The query operation to be added to the batch.
+   * @return A {@link CompletableFuture} to indicate whether the operation is finished.
+   */
+  CompletableFuture<V> processQueryOperation(Operation<K, V> operation) {
+    Preconditions.checkNotNull(operation);
+    Preconditions.checkArgument(operation instanceof GetOperation);
+
+    lock.lock();
+    try {
+      GetOperation<K, V> getOperation = (GetOperation) operation;
+      addOperation(getOperation);
+      return getOperation.getCompletableFuture();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @param operation The update operation to be added to the batch.
+   * @return A {@link CompletableFuture} to indicate whether the operation is finished.
+   */
+  CompletableFuture<Void> processUpdateOperation(Operation<K, V> operation) {
+    Preconditions.checkNotNull(operation);
+    Preconditions.checkArgument(operation instanceof PutOperation || operation instanceof DeleteOperation);
+
+    lock.lock();
+    try {
+      return addOperation(operation);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void processBatch(boolean cancelTimer) {
+    mayCancelTimer(cancelTimer);
+    closeBatch();
+    batchHandler.handle(batch);
+    startNewBatch();
+  }
+
+  private void startNewBatch() {
+    batch = batchProvider.getBatch();
+    batchOpenTimestamp = clock.nanoTime();
+    batchMetrics.incBatchCount();
+    setBatchTimer(batch);
+  }
+
+  private void closeBatch() {
+    batch.close();
+    batchMetrics.updateBatchDuration(clock.nanoTime() - batchOpenTimestamp);
+  }
+
+  private void mayCancelTimer(boolean cancelTimer) {
+    if (cancelTimer && scheduledFuture != null) {
+      scheduledFuture.cancel(true);
+    }
+  }
+
+  /**
+   * Set a timer to close the batch when the batch is older than the max delay.
+   */
+  private void setBatchTimer(Batch<K, V> batch) {
+    final long maxDelay = batch.getMaxBatchDelay().toMillis();
+    if (maxDelay != Integer.MAX_VALUE) {
+      scheduledFuture = scheduledExecutorService.schedule(() -> {
+          lock.lock();
+          try {
+            processBatch(false);
+          } finally {
+            lock.unlock();
+          }
+        }, maxDelay, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Stop the processor and cancel all the pending futures.
+   */
+  void stop() {
+    if (scheduledFuture != null) {
+      scheduledFuture.cancel(true);
+    }
+  }
+
+  /**
+   * Get the current number of operations received.
+   */
+  @VisibleForTesting
+  int size() {
+    return batch == null ? 0 : batch.size();
+  }
+
+  /**
+   * Get the latest update operation for the specified key.
+   */
+  @VisibleForTesting
+  Operation<K, V> getLastUpdate(K key) {
+    final Collection<Operation<K, V>> operations = batch.getOperations();
+    final Iterator<Operation<K, V>> iterator = operations.iterator();
+    Operation<K, V> lastUpdate = null;
+    while (iterator.hasNext()) {
+      final Operation<K, V> operation = iterator.next();
+      if ((operation instanceof PutOperation || operation instanceof DeleteOperation)
+          && operation.getKey().equals(key)) {
+        lastUpdate = operation;
+      }
+    }
+    return lastUpdate;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/BatchingNotSupportedException.java b/samza-core/src/main/java/org/apache/samza/table/batching/BatchingNotSupportedException.java
new file mode 100644
index 0000000..ac0a19f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/BatchingNotSupportedException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+/**
+ * When adding an {@link Operation} to a {@link Batch}, if the batch implementation class
+ * does not want to batch the operation, it should throw a {@link BatchingNotSupportedException}.
+ */
+public class BatchingNotSupportedException extends UnsupportedOperationException {
+  public BatchingNotSupportedException() {
+    super();
+  }
+
+  public BatchingNotSupportedException(String message) {
+    super(message);
+  }
+
+  public BatchingNotSupportedException(Throwable cause) {
+    super(cause);
+  }
+
+  public BatchingNotSupportedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
new file mode 100644
index 0000000..8589505
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatch.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+/**
+ * A newer update operation will override the value of an earlier one.
+ * Consecutive query operations with the same key will be combined as a single query.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+class CompactBatch<K, V> extends AbstractBatch<K, V> {
+  private final Map<K, Operation<K, V>> updates = new LinkedHashMap<>();
+  private final Map<K, Operation<K, V>> queries = new LinkedHashMap<>();
+
+  public CompactBatch(int maxBatchSize, Duration maxBatchDelay) {
+    super(maxBatchSize, maxBatchDelay);
+  }
+
+  /**
+   * @return The batch size.
+   */
+  @Override
+  public int size() {
+    return updates.size() + queries.size();
+  }
+
+  /**
+   * When adding a GetOperation to the batch, mark the GetOperation completed immediately if there is
+   * an UpdateOperation for the key, otherwise, adding the operation to a list.
+   * When adding a Put or DeleteOperation, if there is an update operation for the same key, the existing
+   * operation will be replaced by the new one.
+   */
+  @Override
+  public CompletableFuture<Void> addOperation(Operation<K, V> operation) {
+    Preconditions.checkNotNull(operation);
+
+    if (operation.getArgs() != null && operation.getArgs().length > 0) {
+      throw BATCH_NOT_SUPPORTED_EXCEPTION;
+    }
+
+    if (operation instanceof GetOperation) {
+      queries.putIfAbsent(operation.getKey(), operation);
+    } else {
+      updates.put(operation.getKey(), operation);
+    }
+    if (size() >= maxBatchSize) {
+      close();
+    }
+    return completableFuture;
+  }
+
+  @Override
+  public Collection<Operation<K, V>> getOperations() {
+    return Stream.of(queries.values(), updates.values()).flatMap(Collection::stream).collect(Collectors.toList());
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java
new file mode 100644
index 0000000..d5ed8a8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompactBatchProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+public class CompactBatchProvider<K, V> extends BatchProvider<K, V> {
+  @Override
+  public Batch<K, V> getBatch() {
+    return new CompactBatch<>(getMaxBatchSize(), getMaxBatchDelay());
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatch.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatch.java
new file mode 100644
index 0000000..0a9aadb
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatch.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The complete operation sequences are stored in the batch, regardless of whether the keys are the same or not.
+ *
+ * @param <K> The type of the key associated with the {@link Operation}
+ * @param <V> The type of the value associated with the {@link Operation}
+ */
+class CompleteBatch<K, V> extends AbstractBatch<K, V> {
+  private final List<Operation<K, V>> operations = new ArrayList<>();
+
+  public CompleteBatch(int maxBatchSize, Duration maxBatchDelay) {
+    super(maxBatchSize, maxBatchDelay);
+  }
+
+  /**
+   * @return The batch size.
+   */
+  @Override
+  public int size() {
+    return operations.size();
+  }
+
+  /**
+   * All operations will be buffered without any compaction.
+   */
+  @Override
+  public CompletableFuture<Void> addOperation(Operation<K, V> operation) {
+    Preconditions.checkNotNull(operation);
+    operations.add(operation);
+    if (size() >= maxBatchSize) {
+      close();
+    }
+    return completableFuture;
+  }
+
+  @Override
+  public Collection<Operation<K, V>> getOperations() {
+    return operations;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java
new file mode 100644
index 0000000..8068281
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/CompleteBatchProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+public class CompleteBatchProvider<K, V> extends BatchProvider<K, V> {
+  @Override
+  public Batch<K, V> getBatch() {
+    return new CompleteBatch<>(getMaxBatchSize(), getMaxBatchDelay());
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java b/samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java
new file mode 100644
index 0000000..3f19f6d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/DeleteOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Delete operation.
+ *
+ * @param <K> The type of the key.
+ */
+public class DeleteOperation<K, V> implements Operation<K, V> {
+  final K key;
+  final Object[] args;
+
+  public DeleteOperation(K key, Object ... args) {
+    Preconditions.checkNotNull(key);
+
+    this.key = key;
+    this.args = args;
+  }
+
+  /**
+   * @return The key to be deleted.
+   */
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  /**
+   * @return null.
+   */
+  @Override
+  public V getValue() {
+    return null;
+  }
+
+  /**
+   * @return The extra arguments associated with the table.
+   */
+  @Override
+  public Object[] getArgs() {
+    return args;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/GetOperation.java b/samza-core/src/main/java/org/apache/samza/table/batching/GetOperation.java
new file mode 100644
index 0000000..cae7779
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/GetOperation.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * Get operation.
+ *
+ * @param <K> The type of the key.
+ */
+public class GetOperation<K, V> implements Operation<K, V> {
+  final K key;
+  final Object[] args;
+  final CompletableFuture<V> completableFuture = new CompletableFuture<>();
+
+  public GetOperation(K key, Object ... args) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(args);
+
+    this.key = key;
+    this.args = args;
+  }
+
+  /**
+   * @return The key of the operation.
+   */
+  @Override
+  public K getKey() {
+    Preconditions.checkNotNull(key);
+
+    return key;
+  }
+
+  /**
+   * @return null.
+   */
+  @Override
+  public V getValue() {
+    return null;
+  }
+
+  /**
+   * @return The extra arguments associated with the table.
+   */
+  @Override
+  public Object[] getArgs() {
+    return args;
+  }
+
+  CompletableFuture<V> getCompletableFuture() {
+    return completableFuture;
+  }
+
+  void complete(V val) {
+    completableFuture.complete(val);
+  }
+
+  void completeExceptionally(Throwable ex) {
+    completableFuture.completeExceptionally(ex);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java b/samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java
new file mode 100644
index 0000000..48a75c5
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/PutOperation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * Put operation.
+ *
+ * @param <K> The type of the key.
+ * @param <V> The type of the value
+ */
+public class PutOperation<K, V> implements Operation<K, V> {
+  final private K key;
+  final private V val;
+  final private Object[] args;
+
+  public PutOperation(K key, V val, Object ... args) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(args);
+
+    this.key = key;
+    this.val = val;
+    this.args = args;
+  }
+
+  /**
+   * @return The key to be put to the table.
+   */
+  @Override
+  public K getKey() {
+    return key;
+  }
+
+  /**
+   * @return The value to be put to the table.
+   */
+  @Override
+  public V getValue() {
+    return val;
+  }
+
+  /**
+   * @return The extra arguments associated with the table.
+   */
+  @Override
+  public Object[] getArgs() {
+    return args;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/batching/TableBatchHandler.java b/samza-core/src/main/java/org/apache/samza/table/batching/TableBatchHandler.java
new file mode 100644
index 0000000..12ec6f6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/batching/TableBatchHandler.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import com.google.common.base.Preconditions;
+import java.util.stream.Collectors;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.AsyncReadWriteTable;
+
+
+/**
+ * Defines how table performs batch operations.
+ *
+ * @param <K> The key type of the operation.
+ * @param <V> The value type of the operation.
+ */
+public class TableBatchHandler<K, V> implements BatchHandler<K, V> {
+  private final AsyncReadWriteTable<K, V> table;
+
+  public TableBatchHandler(AsyncReadWriteTable<K, V> table) {
+    Preconditions.checkNotNull(table);
+
+    this.table = table;
+  }
+
+  /**
+   * Define how batch get should be done.
+   *
+   * @param operations The batch get operations.
+   * @return A CompletableFuture to represent the status of the batch operation.
+   */
+  private CompletableFuture<?> handleBatchGet(Collection<Operation<K, V>> operations) {
+    Preconditions.checkNotNull(operations);
+    final List<K> gets = getOperationKeys(operations);
+    if (gets.isEmpty()) {
+      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+    }
+
+    final Object[] args = getOperationArgs(operations);
+    final CompletableFuture<Map<K, V>> getsFuture = args == null ?
+        table.getAllAsync(gets) : table.getAllAsync(gets, args);
+
+    getsFuture.whenComplete((map, throwable) -> {
+        operations.forEach(operation -> {
+            GetOperation<K, V> getOperation = (GetOperation<K, V>) operation;
+            if (throwable != null) {
+              getOperation.completeExceptionally(throwable);
+            } else {
+              getOperation.complete(map.get(operation.getKey()));
+            }
+          });
+      });
+    return getsFuture;
+  }
+
+  /**
+   * Define how batch put should be done.
+   *
+   * @param operations The batch get operations.
+   * @return A CompletableFuture to represent the status of the batch operation.
+   */
+  private CompletableFuture<?> handleBatchPut(Collection<Operation<K, V>> operations) {
+    Preconditions.checkNotNull(operations);
+
+    final List<Entry<K, V>> puts = operations.stream()
+        .map(op -> new Entry<>(op.getKey(), op.getValue()))
+        .collect(Collectors.toList());
+    if (puts.isEmpty()) {
+      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+    }
+
+    final Object[] args = getOperationArgs(operations);
+    return args == null ? table.putAllAsync(puts) : table.putAllAsync(puts, args);
+  }
+
+  /**
+   * Define how batch delete should be done.
+   *
+   * @param operations The batch get operations.
+   * @return A CompletableFuture to represent the status of the batch operation.
+   */
+  private CompletableFuture<?> handleBatchDelete(Collection<Operation<K, V>> operations) {
+    Preconditions.checkNotNull(operations);
+
+    final List<K> deletes = getOperationKeys(operations);
+    if (deletes.isEmpty()) {
+      return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
+    }
+    final Object[] args = getOperationArgs(operations);
+    return args == null ? table.deleteAllAsync(deletes) : table.deleteAllAsync(deletes, args);
+  }
+
+  private List<K> getOperationKeys(Collection<Operation<K, V>> operations) {
+    return operations.stream().map(op -> op.getKey()).collect(Collectors.toList());
+  }
+
+  private Object[] getOperationArgs(Collection<Operation<K, V>> operations) {
+    if (!operations.stream().anyMatch(operation -> operation.getArgs() != null && operation.getArgs().length > 0)) {
+      return null;
+    }
+    final List<Object[]> argsList = new ArrayList<>(operations.size());
+    operations.forEach(operation -> argsList.add(operation.getArgs()));
+    return argsList.toArray();
+  }
+
+  private List<Operation<K, V>> getQueryOperations(Batch<K, V> batch) {
+    return batch.getOperations().stream().filter(op -> op instanceof GetOperation)
+        .collect(Collectors.toList());
+  }
+
+  private List<Operation<K, V>> getPutOperations(Batch<K, V> batch) {
+    return batch.getOperations().stream().filter(op -> op instanceof PutOperation)
+        .collect(Collectors.toList());
+  }
+
+  private List<Operation<K, V>> getDeleteOperations(Batch<K, V> batch) {
+    return batch.getOperations().stream().filter(op -> op instanceof DeleteOperation)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Perform batch operations.
+   */
+  @Override
+  public CompletableFuture<Void> handle(Batch<K, V> batch) {
+    return CompletableFuture.allOf(
+        handleBatchPut(getPutOperations(batch)),
+        handleBatchDelete(getDeleteOperations(batch)),
+        handleBatchGet(getQueryOperations(batch)))
+        .whenComplete((val, throwable) -> {
+            if (throwable != null) {
+              batch.completeExceptionally(throwable);
+            } else {
+              batch.complete();
+            }
+          });
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
index 23f4c01..85f612c 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java
@@ -34,6 +34,8 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.AsyncReadWriteTable;
 import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batching.BatchProvider;
+import org.apache.samza.table.batching.AsyncBatchingTable;
 import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
 import org.apache.samza.table.retry.AsyncRetriableTable;
 import org.apache.samza.table.retry.TableRetryPolicy;
@@ -88,6 +90,10 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
   protected final TableRetryPolicy readRetryPolicy;
   protected final TableRetryPolicy writeRetryPolicy;
   protected final ScheduledExecutorService retryExecutor;
+  // batch
+  protected final BatchProvider<K, V> batchProvider;
+  protected final ScheduledExecutorService batchExecutor;
+
   // Other
   protected final ExecutorService callbackExecutor;
 
@@ -105,6 +111,8 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
    * @param readRetryPolicy read retry policy
    * @param writeRetryPolicy write retry policy
    * @param retryExecutor executor for invoking retries
+   * @param batchProvider batch provider to create a batch instance
+   * @param batchExecutor scheduled executor for batch
    * @param callbackExecutor executor for invoking async callbacks
    */
   public RemoteTable(
@@ -117,6 +125,8 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
       TableRetryPolicy readRetryPolicy,
       TableRetryPolicy writeRetryPolicy,
       ScheduledExecutorService retryExecutor,
+      BatchProvider<K, V> batchProvider,
+      ScheduledExecutorService batchExecutor,
       ExecutorService callbackExecutor) {
 
     super(tableId);
@@ -131,6 +141,8 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
     this.writeRetryPolicy = writeRetryPolicy;
     this.callbackExecutor = callbackExecutor;
     this.retryExecutor = retryExecutor;
+    this.batchProvider = batchProvider;
+    this.batchExecutor = batchExecutor;
 
     AsyncReadWriteTable table = new AsyncRemoteTable(readFn, writeFn);
     if (readRateLimiter != null || writeRateLimiter != null) {
@@ -139,6 +151,9 @@ public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
     if (readRetryPolicy != null || writeRetryPolicy != null) {
       table = new AsyncRetriableTable(tableId, table, readRetryPolicy, writeRetryPolicy, retryExecutor, readFn, writeFn);
     }
+    if (batchProvider != null) {
+      table = new AsyncBatchingTable(tableId, table, batchProvider, batchExecutor);
+    }
 
     asyncTable = table;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
index aca0a4b..8cd1c07 100644
--- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
+++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.batching.BatchProvider;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.table.BaseTableProvider;
@@ -51,6 +52,7 @@ public class RemoteTableProvider extends BaseTableProvider {
    */
   private static Map<String, ExecutorService> rateLimitingExecutors = new ConcurrentHashMap<>();
   private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
+  private static Map<String, ScheduledExecutorService> batchExecutors = new ConcurrentHashMap<>();
   private static ScheduledExecutorService retryExecutor;
 
   public RemoteTableProvider(String tableId) {
@@ -116,10 +118,22 @@ public class RemoteTableProvider extends BaseTableProvider {
             }));
     }
 
+    BatchProvider batchProvider = deserializeObject(tableConfig, RemoteTableDescriptor.BATCH_PROVIDER);
+    if (batchProvider != null) {
+      batchExecutors.computeIfAbsent(tableId, (arg) ->
+          Executors.newSingleThreadScheduledExecutor(runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("table-" + tableId + "-batch-scheduled-executor");
+              thread.setDaemon(true);
+              return thread;
+            }));
+    }
+
+
     RemoteTable table = new RemoteTable(tableId,
         readFn, writeFn,
         readRateLimiter, writeRateLimiter, rateLimitingExecutors.get(tableId),
-        readRetryPolicy, writeRetryPolicy, retryExecutor,
+        readRetryPolicy, writeRetryPolicy, retryExecutor, batchProvider, batchExecutors.get(tableId),
         callbackExecutors.get(tableId));
     table.init(this.context);
     tables.add(table);
@@ -134,6 +148,8 @@ public class RemoteTableProvider extends BaseTableProvider {
     rateLimitingExecutors.clear();
     callbackExecutors.values().forEach(e -> e.shutdown());
     callbackExecutors.clear();
+    batchExecutors.values().forEach(e -> e.shutdown());
+    batchExecutors.clear();
   }
 
   private <T> T deserializeObject(JavaTableConfig tableConfig, String key) {
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
index 90d6fe8..bb8abed 100644
--- a/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/table/utils/TableMetricsUtil.java
@@ -20,6 +20,8 @@
 package org.apache.samza.table.utils;
 
 import com.google.common.base.Preconditions;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
@@ -29,6 +31,7 @@ import org.apache.samza.table.Table;
 import org.apache.samza.table.caching.SupplierGauge;
 
 import java.util.function.Supplier;
+import org.apache.samza.util.HighResolutionClock;
 
 
 /**
@@ -103,4 +106,8 @@ public class TableMetricsUtil {
     return String.format("%s-%s", tableId, name);
   }
 
+  public static HighResolutionClock mayCreateHighResolutionClock(Config config) {
+    final MetricsConfig metricsConfig = new MetricsConfig(config);
+    return metricsConfig.getMetricsTimerEnabled() ? System::nanoTime : () -> 0;
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
new file mode 100644
index 0000000..dca7e75
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchProcessor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static java.lang.Thread.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    TestBatchProcessor.TestCreate.class,
+    TestBatchProcessor.TestUpdatesAndLookup.class,
+    TestBatchProcessor.TestBatchTriggered.class
+  })
+public class TestBatchProcessor {
+  private static final int SLOW_OPERATION_TIME_MS = 500;
+  private static final Supplier<Void> SLOW_UPDATE_SUPPLIER = () -> {
+    try {
+      sleep(SLOW_OPERATION_TIME_MS);
+    } catch (InterruptedException e) {
+      // ignore
+    }
+    return null;
+  };
+
+  public static class TestCreate {
+    @Test
+    public void testCreate() {
+      final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+      final BatchProcessor<Integer, Integer> batchProcessor = createBatchProcessor(table,
+          3, Integer.MAX_VALUE);
+
+      // The batch processor initially has no operation.
+      Assert.assertEquals(0, batchProcessor.size());
+      batchProcessor.processUpdateOperation(new PutOperation<>(1, 1));
+      // The batch processor now has one operation.
+      Assert.assertEquals(1, batchProcessor.size());
+    }
+  }
+
+  public static class TestUpdatesAndLookup {
+    @Test
+    public void testUpdateAndLookup() {
+      final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+      final BatchProcessor<Integer, Integer> batchProcessor =
+          createBatchProcessor(table, Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+      int numberOfPuts = 10;
+      for (int i = 0; i < numberOfPuts; i++) {
+        batchProcessor.processUpdateOperation(new PutOperation<>(i, i));
+      }
+
+      // verify that the number of addBatch operations is correct.
+      Assert.assertEquals(numberOfPuts, batchProcessor.size());
+
+      // Verify that the value is correct for each key.
+      for (int i = 0; i < numberOfPuts; i++) {
+        final Operation<Integer, Integer> operation = batchProcessor.getLastUpdate(i);
+        Assert.assertEquals(i, operation.getKey().intValue());
+        Assert.assertEquals(i, operation.getValue().intValue());
+      }
+    }
+  }
+
+  public static class TestBatchTriggered {
+    @Test
+    public void testBatchOperationTriggeredByBatchSize() {
+      final int maxBatchSize = 3;
+
+      final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+      when(table.putAllAsync(anyList())).thenReturn(CompletableFuture.supplyAsync(SLOW_UPDATE_SUPPLIER));
+
+      final BatchProcessor<Integer, Integer> batchProcessor =
+          createBatchProcessor(table, maxBatchSize, Integer.MAX_VALUE);
+
+      List<CompletableFuture<Void>> futureList = new ArrayList<>();
+      // One batch will be created and sent to the remote table after the for-loop.
+      for (int i = 0; i < maxBatchSize; i++) {
+        futureList.add(batchProcessor.processUpdateOperation(new PutOperation<>(i, i)));
+      }
+
+      for (int i = 0; i < maxBatchSize; i++) {
+        Assert.assertFalse(futureList.get(i).isDone());
+      }
+      Assert.assertEquals(0, batchProcessor.size());
+
+      try {
+        sleep(SLOW_OPERATION_TIME_MS * 2);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+
+      for (int i = 0; i < maxBatchSize; i++) {
+        Assert.assertTrue(futureList.get(i).isDone());
+      }
+    }
+
+    @Test
+    public void testBatchOperationTriggeredByTimer() {
+      final int maxBatchDelayMs = 100;
+      final int putOperationCount = 100;
+
+      final ReadWriteTable<Integer, Integer> table = mock(ReadWriteTable.class);
+      when(table.putAllAsync(any())).thenReturn(CompletableFuture.completedFuture(null));
+      when(table.deleteAllAsync(anyList())).thenReturn(CompletableFuture.completedFuture(null));
+
+      final BatchProcessor<Integer, Integer> batchProcessor =
+          createBatchProcessor(table, Integer.MAX_VALUE, maxBatchDelayMs);
+
+      for (int i = 0; i < putOperationCount; i++) {
+        batchProcessor.processUpdateOperation(new PutOperation<>(i, i));
+      }
+
+      // There's one batch with infinite maximum size, it has 100ms maximum delay.
+      Assert.assertEquals(putOperationCount, batchProcessor.size());
+
+      try {
+        sleep(maxBatchDelayMs * 2);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+
+      // After the timer fired, a new batch will be created.
+      Assert.assertEquals(0, batchProcessor.size());
+    }
+  }
+
+  private static BatchProcessor<Integer, Integer> createBatchProcessor(ReadWriteTable<Integer, Integer> table,
+      int maxSize, int maxDelay) {
+    final BatchProvider<Integer, Integer> batchProvider = new CompactBatchProvider<Integer, Integer>()
+        .withMaxBatchDelay(Duration.ofMillis(maxDelay)).withMaxBatchSize(maxSize);
+    final BatchHandler<Integer, Integer> batchHandler = new TableBatchHandler<>(table);
+    final BatchMetrics batchMetrics = mock(BatchMetrics.class);
+    return new BatchProcessor<>(batchMetrics, batchHandler, batchProvider, () -> 0, Executors.newSingleThreadScheduledExecutor());
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchTable.java b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchTable.java
new file mode 100644
index 0000000..b1bd4a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/table/batching/TestBatchTable.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.batching;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.ReadWriteTable;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+public class TestBatchTable {
+  private static final int BATCH_SIZE = 5;
+  private static final Duration BATCH_DELAY = Duration.ofMillis(Integer.MAX_VALUE);
+
+  private AsyncBatchingTable<Integer, Integer> asyncBatchingTable;
+  private ReadWriteTable<Integer, Integer> table;
+  private Map<Integer, Integer> tableDb;
+
+  @Before
+  public void setup() {
+    final Answer getAnswer = invocation -> {
+      Integer key = invocation.getArgumentAt(0, Integer.class);
+      return tableDb.get(key);
+    };
+
+    final Answer getAsyncAnswer = invocation -> {
+      Integer key = invocation.getArgumentAt(0, Integer.class);
+      return CompletableFuture.completedFuture(tableDb.get(key));
+    };
+
+    final Answer getAllAsyncAnswer = invocation -> {
+      final List<Integer> list = invocation.getArgumentAt(0, List.class);
+      final Map<Integer, Integer> map = new HashMap<>();
+      list.forEach(k -> map.put(k, tableDb.get(k)));
+      return CompletableFuture.completedFuture(map);
+    };
+
+    final Answer putAnswer = invocation -> {
+      Integer key = invocation.getArgumentAt(0, Integer.class);
+      Integer value = invocation.getArgumentAt(1, Integer.class);
+      tableDb.put(key, value);
+      return null;
+    };
+
+    final Answer putAsyncAnswer = invocation -> {
+      final Integer key = invocation.getArgumentAt(0, Integer.class);
+      final Integer value = invocation.getArgumentAt(1, Integer.class);
+      tableDb.put(key, value);
+      return CompletableFuture.completedFuture(null);
+    };
+
+    final Answer putAllAsyncAnswer = invocation -> {
+      final List<Entry<Integer, Integer>> list = invocation.getArgumentAt(0, List.class);
+      list.forEach(entry -> tableDb.put(entry.getKey(), entry.getValue()));
+      return CompletableFuture.completedFuture(null);
+    };
+
+    final Answer deleteAnswer = invocation -> {
+      final Integer key = invocation.getArgumentAt(0, Integer.class);
+      tableDb.remove(key);
+      return null;
+    };
+
+    final Answer deleteAsyncAnswer = invocation -> {
+      final Integer key = invocation.getArgumentAt(0, Integer.class);
+      tableDb.remove(key);
+      return CompletableFuture.completedFuture(null);
+    };
+
+    final Answer deleteAllAsyncAnswer = invocation -> {
+      final List<Integer> list = invocation.getArgumentAt(0, List.class);
+      list.forEach(k -> tableDb.remove(k));
+      return CompletableFuture.completedFuture(null);
+    };
+
+    table = mock(ReadWriteTable.class);
+    final BatchMetrics batchMetrics = mock(BatchMetrics.class);
+    tableDb = new HashMap<>();
+    asyncBatchingTable = new AsyncBatchingTable("id", table, new CompactBatchProvider()
+        .withMaxBatchSize(BATCH_SIZE)
+        .withMaxBatchDelay(BATCH_DELAY), Executors.newSingleThreadScheduledExecutor());
+    asyncBatchingTable.createBatchProcessor(() -> 0, mock(BatchMetrics.class));
+
+    doAnswer(putAnswer).when(table).put(anyInt(), anyInt());
+    doAnswer(putAsyncAnswer).when(table).putAsync(anyInt(), anyInt());
+    doAnswer(putAllAsyncAnswer).when(table).putAllAsync(anyList());
+
+    doAnswer(deleteAnswer).when(table).delete(anyInt());
+    doAnswer(deleteAsyncAnswer).when(table).deleteAsync(anyInt());
+    doAnswer(deleteAllAsyncAnswer).when(table).deleteAllAsync(anyList());
+
+    doAnswer(getAnswer).when(table).get(anyInt());
+    doAnswer(getAsyncAnswer).when(table).getAsync(anyInt());
+    doAnswer(getAllAsyncAnswer).when(table).getAllAsync(anyList());
+  }
+
+  @After
+  public void tearDown() {
+    asyncBatchingTable.close();
+  }
+
+  @Test
+  public void testPutAsync() {
+    final List<CompletableFuture<Void>> futures = new LinkedList<>();
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      futures.add(asyncBatchingTable.putAsync(i, i));
+    }
+    sleep();
+
+    final BatchProcessor<Integer, Integer> batchProcessor = asyncBatchingTable.getBatchProcessor();
+
+    // Verify that all async puts are finished.
+    futures.forEach(future -> Assert.assertTrue(future.isDone()));
+    verify(table, times(1)).putAllAsync(any());
+
+    // There should be no operations in the batch processor.
+    Assert.assertEquals(0, batchProcessor.size());
+
+    asyncBatchingTable.putAsync(BATCH_SIZE, BATCH_SIZE);
+
+    // Now batch size should be 1.
+    Assert.assertEquals(1, batchProcessor.size());
+  }
+
+  @Test
+  public void testPutAllAsync() {
+    final List<Entry<Integer, Integer>> entries = new LinkedList<>();
+
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      entries.add(new Entry<>(i, i));
+    }
+
+    CompletableFuture<Void> future = asyncBatchingTable.putAllAsync(entries);
+    final BatchProcessor<Integer, Integer> batchProcessor = asyncBatchingTable.getBatchProcessor();
+
+    sleep();
+
+    // Verify that putAllAsync is finished.
+    Assert.assertTrue(future.isDone());
+
+    // There should be no pending operations.
+    Assert.assertEquals(0, batchProcessor.size());
+
+    // The addBatchUpdates batch operations propagates to the table.
+    verify(table, times(1)).putAllAsync(anyList());
+
+    // This new addBatchUpdates will make the batch size to be 1.
+    asyncBatchingTable.putAsync(BATCH_SIZE, BATCH_SIZE);
+
+    Assert.assertEquals(1, batchProcessor.size());
+  }
+
+  @Test
+  public void testGetAsync() throws ExecutionException, InterruptedException {
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      asyncBatchingTable.putAsync(i, i);
+    }
+    sleep();
+
+    final List<CompletableFuture<Integer>> futures = new ArrayList<>(BATCH_SIZE);
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      futures.add(asyncBatchingTable.getAsync(i));
+    }
+    sleep();
+
+    for (Integer i = 0; i < BATCH_SIZE; i++) {
+      Assert.assertTrue(futures.get(i).isDone());
+      Assert.assertEquals(i, futures.get(i).get());
+    }
+    verify(table, times(1)).getAllAsync(anyList());
+  }
+
+  @Test
+  public void testGetAllAsync() throws ExecutionException, InterruptedException {
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      asyncBatchingTable.putAsync(i, i);
+    }
+    sleep();
+
+    final List<Integer> keys = new LinkedList<>();
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      keys.add(new Integer(i));
+    }
+
+    CompletableFuture<Map<Integer, Integer>> future = asyncBatchingTable.getAllAsync(keys);
+    sleep();
+
+    Assert.assertTrue(future.isDone());
+    Assert.assertEquals(BATCH_SIZE, future.get().size());
+
+    verify(table, times(1)).getAllAsync(anyList());
+  }
+
+  @Test
+  public void testDeleteAsync() throws Exception {
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      asyncBatchingTable.putAsync(i, i);
+    }
+    sleep();
+
+    // The 1st batch is done.
+    verify(table, times(1)).putAllAsync(anyList());
+
+    final List<CompletableFuture<Void>> completableFutures = new ArrayList<>();
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      completableFutures.add(asyncBatchingTable.deleteAsync(i));
+    }
+    sleep();
+
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      Assert.assertEquals(null, completableFutures.get(i).get());
+    }
+  }
+
+  @Test
+  public void testDeleteAllAsync() throws Exception {
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      asyncBatchingTable.putAsync(i, i);
+    }
+    sleep();
+
+    final List<Integer> keys = new LinkedList<>();
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      keys.add(new Integer(i));
+    }
+
+    final CompletableFuture<Void> future = asyncBatchingTable.deleteAllAsync(keys);
+    sleep();
+    Assert.assertTrue(future.isDone());
+
+    final CompletableFuture<Map<Integer, Integer>> getAllFuture = asyncBatchingTable.getAllAsync(keys);
+    sleep();
+
+    Assert.assertTrue(getAllFuture.isDone());
+    getAllFuture.get().forEach((k, v) -> Assert.assertEquals(null, v));
+  }
+
+  private void sleep() {
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      // ignore
+    }
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
index 44efc5b..d873c16 100644
--- a/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
@@ -286,6 +286,7 @@ public class TestCachingTable {
         tableId + "-remote", readFn, writeFn,
         rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
         null, null, null,
+        null, null,
         Executors.newSingleThreadExecutor());
 
     final CachingTable<String, String> cachingTable = new CachingTable<>(
@@ -407,6 +408,7 @@ public class TestCachingTable {
         tableId, readFn, writeFn,
         rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
         null, null, null,
+        null, null,
         Executors.newSingleThreadExecutor());
 
     final CachingTable<String, String> cachingTable = new CachingTable<>(
diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
index 02625bb..7a98504 100644
--- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
+++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java
@@ -86,7 +86,7 @@ public class TestRemoteTable {
 
     RemoteTable<K, V> table = new RemoteTable(tableId, readFn, writeFn,
         readRateLimiter, writeRateLimiter, rateLimitingExecutor,
-        readPolicy, writePolicy, retryExecutor, cbExecutor);
+        readPolicy, writePolicy, retryExecutor, null, null, cbExecutor);
     table.init(getMockContext());
     verify(readFn, times(1)).init(any(), any());
     if (writeFn != null) {
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
index 0a3b13e..a4561ee 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java
@@ -474,7 +474,7 @@ public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
     RemoteTable<String, String> table = new RemoteTable<>("table1", reader, null,
         rateLimitHelper, null, Executors.newSingleThreadExecutor(),
         null, null, null,
-        null);
+        null, null, null);
     table.init(createMockContext());
     table.get("abc");
   }
@@ -490,7 +490,7 @@ public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
     RemoteTable<String, String> table = new RemoteTable<String, String>("table1", reader, writer,
         rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(),
         null, null, null,
-        null);
+        null, null, null);
     table.init(createMockContext());
     table.put("abc", "efg");
   }
@@ -502,7 +502,7 @@ public class TestRemoteTableEndToEnd extends IntegrationTestHarness {
     RemoteTable<String, String> table = new RemoteTable<String, String>("table1", reader, null,
         rateLimitHelper, null, Executors.newSingleThreadExecutor(),
         null, null, null,
-        null);
+        null, null, null);
     table.init(createMockContext());
     try {
       table.put("abc", "efg");
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
new file mode 100644
index 0000000..3cad14e
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableWithBatchEndToEnd.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.batching.CompactBatchProvider;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
+import org.apache.samza.table.remote.BaseTableFunction;
+import org.apache.samza.table.remote.TableRateLimiter;
+import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.TableWriteFunction;
+import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.util.Base64Serializer;
+import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.test.table.TestTableData.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestRemoteTableWithBatchEndToEnd extends IntegrationTestHarness {
+
+  static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
+  static Map<String, AtomicInteger> batchWrites = new HashMap<>();
+  static Map<String, AtomicInteger> batchReads = new HashMap<>();
+
+  static class InMemoryReadFunction extends BaseTableFunction
+      implements TableReadFunction<Integer, Profile> {
+    private final String serializedProfiles;
+    private final String testName;
+    private transient Map<Integer, Profile> profileMap;
+    private transient AtomicInteger batchReadCounter;
+
+    private InMemoryReadFunction(String testName, String profiles) {
+      this.testName = testName;
+      this.serializedProfiles = profiles;
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      Profile[] profiles = Base64Serializer.deserialize(this.serializedProfiles, Profile[].class);
+      this.profileMap = Arrays.stream(profiles).collect(Collectors.toMap(p -> p.getMemberId(), Function.identity()));
+      batchReadCounter = batchReads.get(testName);
+    }
+
+    @Override
+    public CompletableFuture<Profile> getAsync(Integer key) {
+      return CompletableFuture.completedFuture(profileMap.get(key));
+    }
+
+    @Override
+    public CompletableFuture<Map<Integer, Profile>> getAllAsync(Collection<Integer> keys) {
+      batchReadCounter.incrementAndGet();
+      return TableReadFunction.super.getAllAsync(keys);
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+
+    static InMemoryReadFunction getInMemoryReadFunction(String testName, String serializedProfiles) {
+      return new InMemoryReadFunction(testName, serializedProfiles);
+    }
+  }
+
+  static class InMemoryWriteFunction extends BaseTableFunction
+      implements TableWriteFunction<Integer, EnrichedPageView> {
+    private transient List<EnrichedPageView> records;
+    private transient AtomicInteger batchWritesCounter;
+    private final String testName;
+
+    public InMemoryWriteFunction(String testName) {
+      this.testName = testName;
+    }
+
+    // Verify serializable functionality
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+
+      // Write to the global list for verification
+      records = writtenRecords.get(testName);
+      batchWritesCounter = batchWrites.get(testName);
+    }
+
+    @Override
+    public CompletableFuture<Void> putAsync(Integer key, EnrichedPageView record) {
+      records.add(record);
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteAsync(Integer key) {
+      records.remove(key);
+      return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> putAllAsync(Collection<Entry<Integer, EnrichedPageView>> records) {
+      batchWritesCounter.incrementAndGet();
+      return TableWriteFunction.super.putAllAsync(records);
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+
+  static class MyReadFunction extends BaseTableFunction
+      implements TableReadFunction {
+    @Override
+    public CompletableFuture getAsync(Object key) {
+      return null;
+    }
+
+    @Override
+    public boolean isRetriable(Throwable exception) {
+      return false;
+    }
+  }
+
+  private void doTestStreamTableJoinRemoteTable(String testName, boolean batchRead, boolean batchWrite) throws Exception {
+    final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName);
+
+    batchReads.put(testName, new AtomicInteger());
+    batchWrites.put(testName, new AtomicInteger());
+    writtenRecords.put(testName, new CopyOnWriteArrayList<>());
+
+    final int count = 16;
+    final int batchSize = 4;
+    PageView[] pageViews = generatePageViewsWithDistinctKeys(count);
+    String profiles = Base64Serializer.serialize(generateProfiles(count));
+
+    int partitionCount = 1;
+    Map<String, String> configs = TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+    configs.put("task.max.concurrency", String.valueOf(count));
+    configs.put("task.async.commit", String.valueOf(true));
+
+    final RateLimiter readRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+    final RateLimiter writeRateLimiter = mock(RateLimiter.class, withSettings().serializable());
+    final TableRateLimiter.CreditFunction creditFunction = (k, v, args)->1;
+    final StreamApplication app = appDesc -> {
+      RemoteTableDescriptor<Integer, Profile> inputTableDesc = new RemoteTableDescriptor<>("profile-table-1");
+      inputTableDesc
+          .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(testName, profiles))
+          .withRateLimiter(readRateLimiter, creditFunction, null);
+      if (batchRead) {
+        inputTableDesc.withBatchProvider(new CompactBatchProvider().withMaxBatchSize(batchSize).withMaxBatchDelay(Duration.ofHours(1)));
+      }
+
+      // dummy reader
+      TableReadFunction readFn = new MyReadFunction();
+
+      RemoteTableDescriptor<Integer, EnrichedPageView> outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1");
+      outputTableDesc
+          .withReadFunction(readFn)
+          .withWriteFunction(writer)
+          .withRateLimiter(writeRateLimiter, creditFunction, creditFunction);
+      if (batchWrite) {
+        outputTableDesc.withBatchProvider(new CompactBatchProvider().withMaxBatchSize(batchSize).withMaxBatchDelay(Duration.ofHours(1)));
+      }
+
+      Table<KV<Integer, EnrichedPageView>> outputTable = appDesc.getTable(outputTableDesc);
+
+      Table<KV<Integer, Profile>> inputTable = appDesc.getTable(inputTableDesc);
+
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
+      GenericInputDescriptor<PageView> isd = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      appDesc.getInputStream(isd)
+          .map(pv -> new KV<>(pv.getMemberId(), pv))
+          .join(inputTable, new PageViewToProfileJoinFunction())
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(outputTable);
+    };
+
+    Config config = new MapConfig(configs);
+    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
+    executeRun(runner, config);
+
+    runner.waitForFinish();
+    int numExpected = count * partitionCount;
+    Assert.assertEquals(numExpected, writtenRecords.get(testName).size());
+    Assert.assertTrue(writtenRecords.get(testName).get(0) instanceof EnrichedPageView);
+
+    if (batchRead) {
+      Assert.assertEquals(numExpected / batchSize, batchReads.get(testName).get());
+    }
+    if (batchWrite) {
+      Assert.assertEquals(numExpected / batchSize, batchWrites.get(testName).get());
+    }
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTableBatchingReadWrite() throws Exception {
+    doTestStreamTableJoinRemoteTable("testStreamTableJoinRemoteTableBatchingReadWrite", true, true);
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTableBatchingRead() throws Exception {
+    doTestStreamTableJoinRemoteTable("testStreamTableJoinRemoteTableBatchingRead", true, false);
+  }
+
+  @Test
+  public void testStreamTableJoinRemoteTableBatchingWrite() throws Exception {
+    doTestStreamTableJoinRemoteTable("testStreamTableJoinRemoteTableBatchingWrite", false, true);
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index ed73961..21e57a8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -205,6 +205,16 @@ public class TestTableData {
     return pageviews;
   }
 
+  static public PageView[] generatePageViewsWithDistinctKeys(int count) {
+    Random random = new Random();
+    PageView[] pageviews = new PageView[count];
+    for (int i = 0; i < count; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      pageviews[i] = new PageView(pagekey, i);
+    }
+    return pageviews;
+  }
+
   private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
 
   static public Profile[] generateProfiles(int count) {