You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/19 22:06:30 UTC
samza git commit: SAMZA-1774: Support table API in low level
Repository: samza
Updated Branches:
refs/heads/master 7055ce670 -> 525e8e2d8
SAMZA-1774: Support table API in low level
Code changes to support table in low level API.
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srini P<sp...@linkedin.com>
Closes #556 from atoomula/table1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/525e8e2d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/525e8e2d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/525e8e2d
Branch: refs/heads/master
Commit: 525e8e2d8205c7e25d6ba3a13ed22521490496e3
Parents: 7055ce6
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Jul 19 15:06:27 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Thu Jul 19 15:06:27 2018 -0700
----------------------------------------------------------------------
.../samza/table/TableDescriptorsProvider.java | 100 +++++++++++
.../org/apache/samza/table/TableProvider.java | 3 +-
.../org/apache/samza/execution/JobNode.java | 41 +----
.../samza/table/TableConfigGenerator.java | 138 ++++++++++++++++
.../table/TestTableDescriptorsProvider.java | 164 +++++++++++++++++++
5 files changed, 405 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
new file mode 100644
index 0000000..766a4b4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TableDescriptor;
+
+
+/**
+ * Provider to create a list of {@link TableDescriptor} objects to describe one or more Samza tables. This is the
+ * mechanism for providing table support for Samza low level API.
+ *
+ * Developers writing Samza jobs using Samza table(s) should describe the table(s) by implementing
+ * TableDescriptorsProvider.
+ *
+ * Typical user code using Samza tables should look like the following:
+ *
+ * <pre>
+ * {@code
+ * public class SampleTableDescriptorsProvider implements TableDescriptorsProvider {
+ * private ReadableTable<String, Long> remoteTable;
+ * private ReadWriteTable<String, String> localTable;
+ *
+ * {@code @Override}
+ * public List<TableDescriptor> getTableDescriptors() {
+ * List<TableDescriptor> tableDescriptors = new ArrayList<>();
+ * final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
+ * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
+ * .withReadFunction(readRemoteTableFn)
+ * .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+ *
+ * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
+ * .withBlockSize(4096)
+ * .withSerde(KVSerde.of(new LongSerde(), new StringSerde<>())));
+ * .withConfig("some-key", "some-value");
+ * return tableDescriptors;
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * [TODO:SAMZA-1772] will complete the work of introducing low-level Table API. Until then, Table API in low-level
+ * could be used by generating configs from TableDescriptorsProvider (sample code below) through config rewriter.
+ *
+ * <pre>
+ * {@code
+ * private Map<String, String> generateTableConfigs(Config config) {
+ * String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
+ * if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
+ * // tableDescriptorsProviderClass is not configured
+ * return config;
+ * }
+ *
+ * try {
+ * if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
+ * LOG.warn("TableDescriptorsProvider class {} does not implement TableDescriptosProvider.",
+ * tableDescriptorsProviderClassName);
+ * return config;
+ * }
+ *
+ * TableDescriptorsProvider tableDescriptorsProvider =
+ * Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
+ * List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
+ * return TableConfigGenerator.generateConfigsForTableDescs(tableDescs);
+ * } catch (Exception e) {
+ * throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
+ * tableDescriptorsProviderClassName), e);
+ * }
+ * }
+ * }
+ * </pre>
+ */
+@InterfaceStability.Unstable
+public interface TableDescriptorsProvider {
+ /**
+ * Constructs instances of the table descriptors
+ * @param config
+ * @return list of table descriptors
+ */
+ List<TableDescriptor> getTableDescriptors(Config config);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
index bbbe38a..8e60dad 100644
--- a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
+++ b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
@@ -47,8 +47,7 @@ public interface TableProvider {
/**
* Generate any configuration for this table, the generated configuration
* is used by Samza container to construct this table and any components
- * necessary
- * .
+ * necessary.
* @param config the current configuration
* @return configuration for this table
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index db44d9f..6507996 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -31,7 +31,6 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
@@ -46,12 +45,11 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.table.TableConfigGenerator;
import org.apache.samza.util.MathUtil;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.table.TableProviderFactory;
import org.apache.samza.table.TableSpec;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
@@ -179,21 +177,7 @@ public class JobNode {
// write serialized serde instances and stream serde configs to configs
addSerdeConfigs(configs);
- tables.forEach(tableSpec -> {
- // Table provider factory
- configs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
- tableSpec.getTableProviderFactoryClassName());
-
- // Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs()
-
- // Generate additional configuration
- TableProviderFactory tableProviderFactory =
- Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
- TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
- configs.putAll(tableProvider.generateConfig(configs));
- });
-
- log.info("Job {} has generated configs {}", jobName, configs);
+ configs.putAll(TableConfigGenerator.generateConfigsForTableSpecs(tables));
String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
@@ -255,21 +239,11 @@ public class JobNode {
}
});
- // collect all key and msg serde instances for tables
- Map<String, Serde> tableKeySerdes = new HashMap<>();
- Map<String, Serde> tableValueSerdes = new HashMap<>();
- tables.forEach(tableSpec -> {
- tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde());
- tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde());
- });
-
// for each unique stream or store serde instance, generate a unique name and serialize to config
HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
serdes.addAll(streamMsgSerdes.values());
serdes.addAll(storeKeySerdes.values());
serdes.addAll(storeMsgSerdes.values());
- serdes.addAll(tableKeySerdes.values());
- serdes.addAll(tableValueSerdes.values());
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
Base64.Encoder base64Encoder = Base64.getEncoder();
Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -303,17 +277,6 @@ public class JobNode {
String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
});
-
- // set key and msg serdes for tables to the serde names generated above
- tableKeySerdes.forEach((tableId, serde) -> {
- String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
- configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
- });
-
- tableValueSerdes.forEach((tableId, serde) -> {
- String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
- configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
- });
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
new file mode 100644
index 0000000..ac17c68
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/table/TableConfigGenerator.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class to generate table configs.
+ */
+public class TableConfigGenerator {
+ private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
+
+ /**
+ * Generate table configurations given a list of table descriptors
+ * @param tableDescriptors the list of tableDescriptors
+ * @return configuration for the tables
+ */
+ static public Map<String, String> generateConfigsForTableDescs(List<TableDescriptor> tableDescriptors) {
+ return generateConfigsForTableSpecs(getTableSpecs(tableDescriptors));
+ }
+
+ /**
+ * Generate table configurations given a list of table specs
+ * @param tableSpecs the list of tableSpecs
+ * @return configuration for the tables
+ */
+ static public Map<String, String> generateConfigsForTableSpecs(List<TableSpec> tableSpecs) {
+ Map<String, String> tableConfigs = new HashMap<>();
+
+ tableConfigs.putAll(generateTableKVSerdeConfigs(tableSpecs));
+
+ tableSpecs.forEach(tableSpec -> {
+ // Add table provider factory config
+ tableConfigs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
+ tableSpec.getTableProviderFactoryClassName());
+
+ // Generate additional configuration
+ TableProviderFactory tableProviderFactory =
+ Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
+ TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
+ tableConfigs.putAll(tableProvider.generateConfig(tableConfigs));
+ });
+
+ LOG.info("TableConfigGenerator has generated configs {}", tableConfigs);
+ return tableConfigs;
+ }
+
+ static private Map<String, String> generateTableKVSerdeConfigs(List<TableSpec> tableSpecs) {
+ Map<String, String> serdeConfigs = new HashMap<>();
+
+ // Collect key and msg serde instances for all the tables
+ Map<String, Serde> tableKeySerdes = new HashMap<>();
+ Map<String, Serde> tableValueSerdes = new HashMap<>();
+ HashSet<Serde> serdes = new HashSet<>();
+
+ tableSpecs.forEach(tableSpec -> {
+ tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde());
+ tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde());
+ });
+ serdes.addAll(tableKeySerdes.values());
+ serdes.addAll(tableValueSerdes.values());
+
+ // Generate serde names
+ SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+ Base64.Encoder base64Encoder = Base64.getEncoder();
+ Map<Serde, String> serdeUUIDs = new HashMap<>();
+ serdes.forEach(serde -> {
+ String serdeName = serdeUUIDs.computeIfAbsent(serde,
+ s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
+ serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName),
+ base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
+ });
+
+ // Set key and msg serdes for tables to the serde names generated above
+ tableKeySerdes.forEach((tableId, serde) -> {
+ String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+ serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+ });
+
+ tableValueSerdes.forEach((tableId, serde) -> {
+ String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+ serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+ });
+
+ return serdeConfigs;
+ }
+
+ static private List<TableSpec> getTableSpecs(List<TableDescriptor> tableDescs) {
+ Map<TableSpec, TableImpl> tableSpecs = new LinkedHashMap<>();
+
+ tableDescs.forEach(tableDesc -> {
+ TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
+
+ if (tableSpecs.containsKey(tableSpec)) {
+ throw new IllegalStateException(
+ String.format("getTable() invoked multiple times with the same tableId: %s", tableDesc.getTableId()));
+ }
+ tableSpecs.put(tableSpec, new TableImpl(tableSpec));
+ });
+ return new ArrayList<>(tableSpecs.keySet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/525e8e2d/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
new file mode 100644
index 0000000..3ed29ca
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.ConfigRewriter;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.LongSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.RocksDbTableProviderFactory;
+import org.apache.samza.table.TableConfigGenerator;
+import org.apache.samza.table.TableDescriptorsProvider;
+import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.remote.RemoteTableProviderFactory;
+import org.apache.samza.table.remote.TableReadFunction;
+
+import org.apache.samza.util.RateLimiter;
+import org.apache.samza.util.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Table descriptors provider tests for both remote and local tables
+ */
+public class TestTableDescriptorsProvider {
+
+ @Test
+ public void testWithNoConfiguredTableDescriptorProviderClass() throws Exception {
+ Map<String, String> configs = new HashMap<>();
+ String tableRewriterName = "tableRewriter";
+ Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
+ Assert.assertTrue(resultConfig.size() == 0);
+ }
+
+ @Test
+ public void testWithNonTableDescriptorsProviderClass() throws Exception {
+ Map<String, String> configs = new HashMap<>();
+ String tableRewriterName = "tableRewriter";
+ configs.put("tables.descriptors.provider.class", MySampleNonTableDescriptorsProvider.class.getName());
+ Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
+ Assert.assertTrue(resultConfig.size() == 1);
+ JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
+ Assert.assertTrue(tableConfig.getTableIds().size() == 0);
+ }
+
+ @Test
+ public void testWithTableDescriptorsProviderClass() throws Exception {
+ Map<String, String> configs = new HashMap<>();
+ String tableRewriterName = "tableRewriter";
+ configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName());
+ Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
+ Assert.assertTrue(resultConfig.size() == 17);
+
+ String localTableId = "local-table-1";
+ String remoteTableId = "remote-table-1";
+
+ JavaStorageConfig storageConfig = new JavaStorageConfig(resultConfig);
+ Assert.assertTrue(storageConfig.getStoreNames().size() == 1);
+ Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId);
+ Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId),
+ RocksDbKeyValueStorageEngineFactory.class.getName());
+ Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde"));
+ Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde"));
+ Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true);
+ Assert.assertTrue(storeConfig.size() == 4);
+ Assert.assertEquals(storeConfig.getInt("rocksdb.block.size.bytes"), 4096);
+
+ JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
+ Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId),
+ RocksDbTableProviderFactory.class.getName());
+ Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId),
+ RemoteTableProviderFactory.class.getName());
+ Assert.assertTrue(tableConfig.getKeySerde(localTableId).startsWith("StringSerde"));
+ Assert.assertTrue(tableConfig.getValueSerde(localTableId).startsWith("StringSerde"));
+ Assert.assertTrue(tableConfig.getKeySerde(remoteTableId).startsWith("StringSerde"));
+ Assert.assertTrue(tableConfig.getValueSerde(remoteTableId).startsWith("LongSerde"));
+ Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName());
+ Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName());
+ }
+
+ public static class MySampleNonTableDescriptorsProvider {
+ }
+
+ public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
+ @Override
+ public List<TableDescriptor> getTableDescriptors(Config config) {
+ List<TableDescriptor> tableDescriptors = new ArrayList<>();
+ final RateLimiter readRateLimiter = mock(RateLimiter.class);
+ final TableReadFunction readRemoteTable = (TableReadFunction) key -> null;
+
+ tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
+ .withReadFunction(readRemoteTable)
+ .withRateLimiter(readRateLimiter, null, null)
+ .withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
+ tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
+ .withBlockSize(4096)
+ .withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
+ return tableDescriptors;
+ }
+ }
+
+ /**
+ * A sample config rewriter to generate table configs. It instantiates the configured tableDescriptorsProvider class
+ * which implements {@link TableDescriptorsProvider} and generates the table configs.
+ */
+ public static class MySampleTableConfigRewriter implements ConfigRewriter {
+
+ @Override
+ public Config rewrite(String name, Config config) {
+ String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
+ if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
+ // tableDescriptorsProviderClass is not configured
+ return config;
+ }
+
+ try {
+ if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
+ // The configured class does not implement TableDescriptorsProvider.
+ return config;
+ }
+
+ TableDescriptorsProvider tableDescriptorsProvider =
+ Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
+ List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
+ return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(tableDescs)));
+ } catch (Exception e) {
+ throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
+ tableDescriptorsProviderClassName), e);
+ }
+ }
+ }
+}