You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/18 22:21:20 UTC
samza git commit: SAMZA-1953: Remove TableDescriptorsProvider
interface
Repository: samza
Updated Branches:
refs/heads/master 0c2076cf8 -> de91463e2
SAMZA-1953: Remove TableDescriptorsProvider interface
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #742 from atoomula/tablefixes1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de91463e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de91463e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de91463e
Branch: refs/heads/master
Commit: de91463e2380a466d8477975475d81e5e35a2d34
Parents: 0c2076c
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Oct 18 15:21:16 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 18 15:21:16 2018 -0700
----------------------------------------------------------------------
.../samza/table/TableDescriptorsProvider.java | 98 -----------
.../samza/sql/interfaces/DslConverter.java | 1 +
.../interfaces/SamzaSqlJavaTypeFactoryImpl.java | 8 +-
.../table/TestTableDescriptorsProvider.java | 172 -------------------
4 files changed, 5 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
deleted file mode 100644
index 296edf4..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.List;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.table.descriptors.TableDescriptor;
-
-
-/**
- * Provider to create a list of {@link TableDescriptor} objects to describe one or more Samza tables. This is the
- * mechanism for providing table support for Samza low level API.
- *
- * Developers writing Samza jobs using Samza table(s) should describe the table(s) by implementing
- * TableDescriptorsProvider.
- *
- * Typical user code using Samza tables should look like the following:
- *
- * <pre>
- * {@code
- * public class SampleTableDescriptorsProvider implements TableDescriptorsProvider {
- * private ReadableTable<String, Long> remoteTable;
- * private ReadWriteTable<String, String> localTable;
- *
- * {@code @Override}
- * public List<TableDescriptor> getTableDescriptors() {
- * List<TableDescriptor> tableDescriptors = new ArrayList<>();
- * final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
- * tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
- * .withReadFunction(readRemoteTableFn);
- *
- * tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new LongSerde(), new StringSerde<>()))
- * .withBlockSize(4096)
- * .withConfig("some-key", "some-value");
- * return tableDescriptors;
- * }
- * }
- * }
- * </pre>
- *
- * [TODO:SAMZA-1772] will complete the work of introducing low-level Table API. Until then, Table API in low-level
- * could be used by generating configs from TableDescriptorsProvider (sample code below) through config rewriter.
- *
- * <pre>
- * {@code
- * private Map<String, String> generateTableConfigs(Config config) {
- * String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
- * if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
- * // tableDescriptorsProviderClass is not configured
- * return config;
- * }
- *
- * try {
- * if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
- * LOG.warn("TableDescriptorsProvider class {} does not implement TableDescriptosProvider.",
- * tableDescriptorsProviderClassName);
- * return config;
- * }
- *
- * TableDescriptorsProvider tableDescriptorsProvider =
- * Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
- * List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
- * return TableConfigGenerator.generateConfigsForTableDescs(tableDescs);
- * } catch (Exception e) {
- * throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
- * tableDescriptorsProviderClassName), e);
- * }
- * }
- * }
- * </pre>
- */
-@InterfaceStability.Unstable
-public interface TableDescriptorsProvider {
- /**
- * Constructs instances of the table descriptors
- * @param config the job config
- * @return list of table descriptors
- */
- List<TableDescriptor> getTableDescriptors(Config config);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
index fc2ca8e..53e4246 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
@@ -30,6 +30,7 @@ public interface DslConverter {
/**
* Convert the dsl into the Calcite logical plan.
+ * @param dsl String containing one or more dsl queries.
* @return List of Root nodes of the Calcite logical plan.
* If DSL represents multiple SQL statements. You might return root nodes one for each SQL statement.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
index 50001c6..3046615 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
@@ -45,15 +45,15 @@ public class SamzaSqlJavaTypeFactoryImpl
@Override
public RelDataType toSql(RelDataType type) {
- return toSql(this, type);
+ return convertToSql(this, type);
}
- /** Converts a type in Java format to a SQL-oriented type. */
- public static RelDataType toSql(final RelDataTypeFactory typeFactory,
+ // Converts a type in Java format to a SQL-oriented type.
+ private static RelDataType convertToSql(final RelDataTypeFactory typeFactory,
RelDataType type) {
if (type instanceof RelRecordType) {
return typeFactory.createStructType(
- Lists.transform(type.getFieldList(), a0 -> toSql(typeFactory, a0.getType())),
+ Lists.transform(type.getFieldList(), a0 -> convertToSql(typeFactory, a0.getType())),
type.getFieldNames());
}
if (type instanceof JavaType) {
http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
deleted file mode 100644
index f4c05e4..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.ConfigRewriter;
-import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.LongSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableProviderFactory;
-import org.apache.samza.table.TableConfigGenerator;
-import org.apache.samza.table.TableDescriptorsProvider;
-import org.apache.samza.table.remote.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.descriptors.RemoteTableProviderFactory;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.util.RateLimiter;
-import org.apache.samza.util.Util;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Table descriptors provider tests for both remote and local tables
- */
-public class TestTableDescriptorsProvider {
-
- @Test
- public void testWithNoConfiguredTableDescriptorProviderClass() throws Exception {
- Map<String, String> configs = new HashMap<>();
- String tableRewriterName = "tableRewriter";
- Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
- Assert.assertTrue(resultConfig.size() == 0);
- }
-
- @Test
- public void testWithNonTableDescriptorsProviderClass() throws Exception {
- Map<String, String> configs = new HashMap<>();
- String tableRewriterName = "tableRewriter";
- configs.put("tables.descriptors.provider.class", MySampleNonTableDescriptorsProvider.class.getName());
- Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
- Assert.assertTrue(resultConfig.size() == 1);
- JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
- Assert.assertTrue(tableConfig.getTableIds().size() == 0);
- }
-
- @Test
- public void testWithTableDescriptorsProviderClass() throws Exception {
- Map<String, String> configs = new HashMap<>();
- String tableRewriterName = "tableRewriter";
- String jobName = "test-job";
- configs.put(JobConfig.JOB_NAME(), jobName);
- configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName());
- Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
- Assert.assertNotNull(resultConfig);
- Assert.assertTrue(!resultConfig.isEmpty());
-
- String localTableId = "local-table-1";
- String remoteTableId = "remote-table-1";
-
- JavaStorageConfig storageConfig = new JavaStorageConfig(resultConfig);
- Assert.assertTrue(storageConfig.getStoreNames().size() == 1);
- Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId);
- Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId),
- RocksDbKeyValueStorageEngineFactory.class.getName());
- Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true);
- Assert.assertEquals(4, storeConfig.size());
- Assert.assertEquals(4096, storeConfig.getInt("rocksdb.block.size.bytes"));
-
- JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
- Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId),
- RocksDbTableProviderFactory.class.getName());
- Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId),
- RemoteTableProviderFactory.class.getName());
- Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName());
- Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName());
- }
-
- public static class MySampleNonTableDescriptorsProvider {
- }
-
- static class MyReadFunction implements TableReadFunction {
- @Override
- public CompletableFuture getAsync(Object key) {
- return null;
- }
-
- @Override
- public boolean isRetriable(Throwable exception) {
- return false;
- }
- }
-
- public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
- @Override
- public List<TableDescriptor> getTableDescriptors(Config config) {
- List<TableDescriptor> tableDescriptors = new ArrayList<>();
- final RateLimiter readRateLimiter = mock(RateLimiter.class);
- final MyReadFunction readFn = new MyReadFunction();
-
- tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new LongSerde()))
- .withReadFunction(readFn)
- .withRateLimiter(readRateLimiter, null, null));
- tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
- .withBlockSize(4096));
- return tableDescriptors;
- }
- }
-
- /**
- * A sample config rewriter to generate table configs. It instantiates the configured tableDescriptorsProvider class
- * which implements {@link TableDescriptorsProvider} and generates the table configs.
- */
- public static class MySampleTableConfigRewriter implements ConfigRewriter {
-
- @Override
- public Config rewrite(String name, Config config) {
- String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
- if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
- // tableDescriptorsProviderClass is not configured
- return config;
- }
-
- try {
- if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
- // The configured class does not implement TableDescriptorsProvider.
- return config;
- }
-
- TableDescriptorsProvider tableDescriptorsProvider =
- Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
- List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
- return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(config, tableDescs)));
- } catch (Exception e) {
- throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
- tableDescriptorsProviderClassName), e);
- }
- }
- }
-}