You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/19 22:06:30 UTC

samza git commit: SAMZA-1774: Support table API in low level

Repository: samza
Updated Branches:
  refs/heads/master 7055ce670 -> 525e8e2d8


SAMZA-1774: Support table API in low level

Code changes to support table in low level API.

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srini P<sp...@linkedin.com>

Closes #556 from atoomula/table1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/525e8e2d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/525e8e2d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/525e8e2d

Branch: refs/heads/master
Commit: 525e8e2d8205c7e25d6ba3a13ed22521490496e3
Parents: 7055ce6
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Jul 19 15:06:27 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Jul 19 15:06:27 2018 -0700

----------------------------------------------------------------------
 .../samza/table/TableDescriptorsProvider.java   | 100 +++++++++++
 .../org/apache/samza/table/TableProvider.java   |   3 +-
 .../org/apache/samza/execution/JobNode.java     |  41 +----
 .../samza/table/TableConfigGenerator.java       | 138 ++++++++++++++++
 .../table/TestTableDescriptorsProvider.java     | 164 +++++++++++++++++++
 5 files changed, 405 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
new file mode 100644
index 0000000..766a4b4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+
+
+/**
+ * Provider to create a list of {@link TableDescriptor} objects to describe one or more Samza tables. This is the
+ * mechanism for providing table support for Samza low level API.
+ *
+ * Developers writing Samza jobs using Samza table(s) should describe the table(s) by implementing
+ * TableDescriptorsProvider.
+ *
+ * Typical user code using Samza tables should look like the following:
+ *
+ * <pre>
+ * {@code
+ * public class SampleTableDescriptorsProvider implements TableDescriptorsProvider {
+ *   private ReadableTable<String, Long> remoteTable;
+ *   private ReadWriteTable<String, String> localTable;
+ *
+ *   {@code @Override}
+ *   public List<TableDescriptor> getTableDescriptors() {
+ *     List<TableDescriptor> tableDescriptors = new ArrayList<>();
+ *     final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
+ *     tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
+ *       .withReadFunction(readRemoteTableFn)
+ *       .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+ *
+ *     tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
+ *       .withBlockSize(4096)
+ *       .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>())));
+ *       .withConfig("some-key", "some-value");
+ *     return tableDescriptors;
+ *   }
+ * }
+ * }
+ * </pre>
+ *
+ * [TODO:SAMZA-1772] will complete the work of introducing low-level Table API. Until then, Table API in low-level
+ * could be used by generating configs from TableDescriptorsProvider (sample code below) through config rewriter.
+ *
+ * <pre>
+ * {@code
+ * private Map<String, String> generateTableConfigs(Config config) {
+ *   String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
+ *   if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
+ *      // tableDescriptorsProviderClass is not configured
+ *      return config;
+ *   }
+ *
+ *   try {
+ *      if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
+ *         LOG.warn("TableDescriptorsProvider class {} does not implement TableDescriptosProvider.",
+ *            tableDescriptorsProviderClassName);
+ *         return config;
+ *      }
+ *
+ *      TableDescriptorsProvider tableDescriptorsProvider =
+ *          Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
+ *      List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
+ *      return TableConfigGenerator.generateConfigsForTableDescs(tableDescs);
+ *   } catch (Exception e) {
+ *      throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
+ *          tableDescriptorsProviderClassName), e);
+ *   }
+ * }
+ * }
+ * </pre>
+ */
+@InterfaceStability.Unstable
+public interface TableDescriptorsProvider {
+  /**
+   * Constructs instances of the table descriptors
+   * @param config
+   * @return list of table descriptors
+   */
+  List<TableDescriptor> getTableDescriptors(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index bbbe38a..8e60dad 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -47,8 +47,7 @@ public interface TableProvider {
   /**
    * Generate any configuration for this table, the generated configuration
    * is used by Samza container to construct this table and any components
-   * necessary
-   * .
+   * necessary.
    * @param config the current configuration
    * @return configuration for this table
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index db44d9f..6507996 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -31,7 +31,6 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
@@ -46,12 +45,11 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.StatefulOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.table.TableConfigGenerator;
 import org.apache.samza.util.MathUtil;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -179,21 +177,7 @@ public class JobNode {
     // write serialized serde instances and stream serde configs to configs
     addSerdeConfigs(configs);
 
-    tables.forEach(tableSpec -> {
-        // Table provider factory
-        configs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
-            tableSpec.getTableProviderFactoryClassName());
-
-        // Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs()
-
-        // Generate additional configuration
-        TableProviderFactory tableProviderFactory =
-            Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
-        TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
-        configs.putAll(tableProvider.generateConfig(configs));
-      });
-
-    log.info("Job {} has generated configs {}", jobName, configs);
+    configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables));
 
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
 
@@ -255,21 +239,11 @@ public class JobNode {
         }
       });
 
-    // collect all key and msg serde instances for tables
-    Map<String, Serde> tableKeySerdes = new HashMap<>();
-    Map<String, Serde> tableValueSerdes = new HashMap<>();
-    tables.forEach(tableSpec -> {
-        tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde());
-        tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde());
-      });
-
     // for each unique stream or store serde instance, generate a unique name and serialize to config
     HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
     serdes.addAll(streamMsgSerdes.values());
     serdes.addAll(storeKeySerdes.values());
     serdes.addAll(storeMsgSerdes.values());
-    serdes.addAll(tableKeySerdes.values());
-    serdes.addAll(tableValueSerdes.values());
     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
     Base64.Encoder base64Encoder = Base64.getEncoder();
     Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -303,17 +277,6 @@ public class JobNode {
         String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
         configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
       });
-
-    // set key and msg serdes for tables to the serde names generated above
-    tableKeySerdes.forEach((tableId, serde) -> {
-        String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
-        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
-      });
-
-    tableValueSerdes.forEach((tableId, serde) -> {
-        String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
-        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
-      });
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
new file mode 100644
index 0000000..ac17c68
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class to generate table configs.
+ */
+public class TableConfigGenerator {
+  private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
+
+  /**
+   * Generate table configurations given a list of table descriptors
+   * @param tableDescriptors the list of tableDescriptors
+   * @return configuration for the tables
+   */
+  static public Map<String, String> generateConfigsForTableDescs(List<TableDescriptor> tableDescriptors) {
+    return generateConfigsForTableSpecs(getTableSpecs(tableDescriptors));
+  }
+
+  /**
+   * Generate table configurations given a list of table specs
+   * @param tableSpecs the list of tableSpecs
+   * @return configuration for the tables
+   */
+  static public Map<String, String> generateConfigsForTableSpecs(List<TableSpec> tableSpecs) {
+    Map<String, String> tableConfigs = new HashMap<>();
+
+    tableConfigs.putAll(generateTableKVSerdeConfigs(tableSpecs));
+
+    tableSpecs.forEach(tableSpec -> {
+        // Add table provider factory config
+        tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
+            tableSpec.getTableProviderFactoryClassName());
+
+        // Generate additional configuration
+        TableProviderFactory tableProviderFactory =
+            Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
+        TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
+        tableConfigs.putAll(tableProvider.generateConfig(tableConfigs));
+      });
+
+    LOG.info("TableConfigGenerator has generated configs {}", tableConfigs);
+    return tableConfigs;
+  }
+
+  static private Map<String, String> generateTableKVSerdeConfigs(List<TableSpec> tableSpecs) {
+    Map<String, String> serdeConfigs = new HashMap<>();
+
+    // Collect key and msg serde instances for all the tables
+    Map<String, Serde> tableKeySerdes = new HashMap<>();
+    Map<String, Serde> tableValueSerdes = new HashMap<>();
+    HashSet<Serde> serdes = new HashSet<>();
+
+    tableSpecs.forEach(tableSpec -> {
+        tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde());
+        tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde());
+      });
+    serdes.addAll(tableKeySerdes.values());
+    serdes.addAll(tableValueSerdes.values());
+
+    // Generate serde names
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+    Base64.Encoder base64Encoder = Base64.getEncoder();
+    Map<Serde, String> serdeUUIDs = new HashMap<>();
+    serdes.forEach(serde -> {
+        String serdeName = serdeUUIDs.computeIfAbsent(serde,
+            s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
+        serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName),
+            base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
+      });
+
+    // Set key and msg serdes for tables to the serde names generated above
+    tableKeySerdes.forEach((tableId, serde) -> {
+        String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+        serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    tableValueSerdes.forEach((tableId, serde) -> {
+        String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+        serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    return serdeConfigs;
+  }
+
+  static private List<TableSpec> getTableSpecs(List<TableDescriptor> tableDescs) {
+    Map<TableSpec, TableImpl> tableSpecs = new LinkedHashMap<>();
+
+    tableDescs.forEach(tableDesc -> {
+        TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
+
+        if (tableSpecs.containsKey(tableSpec)) {
+          throw new IllegalStateException(
+              String.format("getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId()));
+        }
+        tableSpecs.put(tableSpec, new TableImpl(tableSpec));
+      });
+    return new ArrayList<>(tableSpecs.keySet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
new file mode 100644
index 0000000..3ed29ca
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.ConfigRewriter;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.LongSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.RocksDbTableProviderFactory;
+import org.apache.samza.table.TableConfigGenerator;
+import org.apache.samza.table.TableDescriptorsProvider;
+import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.remote.RemoteTableProviderFactory;
+import org.apache.samza.table.remote.TableReadFunction;
+
+import org.apache.samza.util.RateLimiter;
+import org.apache.samza.util.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Table descriptors provider tests for both remote and local tables
+ */
+public class TestTableDescriptorsProvider {
+
+  @Test
+  public void testWithNoConfiguredTableDescriptorProviderClass() throws Exception {
+    Map<String, String> configs = new HashMap<>();
+    String tableRewriterName = "tableRewriter";
+    Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
+    Assert.assertTrue(resultConfig.size() == 0);
+  }
+
+  @Test
+  public void testWithNonTableDescriptorsProviderClass() throws Exception {
+    Map<String, String> configs = new HashMap<>();
+    String tableRewriterName = "tableRewriter";
+    configs.put("tables.descriptors.provider.class", MySampleNonTableDescriptorsProvider.class.getName());
+    Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
+    Assert.assertTrue(resultConfig.size() == 1);
+    JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
+    Assert.assertTrue(tableConfig.getTableIds().size() == 0);
+  }
+
+  @Test
+  public void testWithTableDescriptorsProviderClass() throws Exception {
+    Map<String, String> configs = new HashMap<>();
+    String tableRewriterName = "tableRewriter";
+    configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName());
+    Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
+    Assert.assertTrue(resultConfig.size() == 17);
+
+    String localTableId = "local-table-1";
+    String remoteTableId = "remote-table-1";
+
+    JavaStorageConfig storageConfig = new JavaStorageConfig(resultConfig);
+    Assert.assertTrue(storageConfig.getStoreNames().size() == 1);
+    Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId);
+    Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId),
+        RocksDbKeyValueStorageEngineFactory.class.getName());
+    Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde"));
+    Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde"));
+    Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true);
+    Assert.assertTrue(storeConfig.size() == 4);
+    Assert.assertEquals(storeConfig.getInt("rocksdb.block.size.bytes"), 4096);
+
+    JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
+    Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId),
+        RocksDbTableProviderFactory.class.getName());
+    Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId),
+        RemoteTableProviderFactory.class.getName());
+    Assert.assertTrue(tableConfig.getKeySerde(localTableId).startsWith("StringSerde"));
+    Assert.assertTrue(tableConfig.getValueSerde(localTableId).startsWith("StringSerde"));
+    Assert.assertTrue(tableConfig.getKeySerde(remoteTableId).startsWith("StringSerde"));
+    Assert.assertTrue(tableConfig.getValueSerde(remoteTableId).startsWith("LongSerde"));
+    Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName());
+    Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName());
+  }
+
+  public static class MySampleNonTableDescriptorsProvider {
+  }
+
+  public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
+    @Override
+    public List<TableDescriptor> getTableDescriptors(Config config) {
+      List<TableDescriptor> tableDescriptors = new ArrayList<>();
+      final RateLimiter readRateLimiter = mock(RateLimiter.class);
+      final TableReadFunction readRemoteTable = (TableReadFunction) key -> null;
+
+      tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
+          .withReadFunction(readRemoteTable)
+          .withRateLimiter(readRateLimiter, null, null)
+          .withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
+      tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
+          .withBlockSize(4096)
+          .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+      return tableDescriptors;
+    }
+  }
+
+  /**
+   * A sample config rewriter to generate table configs. It instantiates the configured tableDescriptorsProvider class
+   * which implements {@link TableDescriptorsProvider} and generates the table configs.
+   */
+  public static class MySampleTableConfigRewriter implements ConfigRewriter {
+
+    @Override
+    public Config rewrite(String name, Config config) {
+      String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
+      if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
+        // tableDescriptorsProviderClass is not configured
+        return config;
+      }
+
+      try {
+        if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
+          // The configured class does not implement TableDescriptorsProvider.
+          return config;
+        }
+
+        TableDescriptorsProvider tableDescriptorsProvider =
+            Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
+        List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
+        return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(tableDescs)));
+      } catch (Exception e) {
+        throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
+            tableDescriptorsProviderClassName), e);
+      }
+    }
+  }
+}