You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/18 22:21:20 UTC

samza git commit: SAMZA-1953: Remove TableDescriptorsProvider interface

Repository: samza
Updated Branches:
  refs/heads/master 0c2076cf8 -> de91463e2


SAMZA-1953: Remove TableDescriptorsProvider interface

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #742 from atoomula/tablefixes1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de91463e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de91463e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de91463e

Branch: refs/heads/master
Commit: de91463e2380a466d8477975475d81e5e35a2d34
Parents: 0c2076c
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Oct 18 15:21:16 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 18 15:21:16 2018 -0700

----------------------------------------------------------------------
 .../samza/table/TableDescriptorsProvider.java   |  98 -----------
 .../samza/sql/interfaces/DslConverter.java      |   1 +
 .../interfaces/SamzaSqlJavaTypeFactoryImpl.java |   8 +-
 .../table/TestTableDescriptorsProvider.java     | 172 -------------------
 4 files changed, 5 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
deleted file mode 100644
index 296edf4..0000000
--- a/samza-api/src/main/java/org/apache/samza/table/TableDescriptorsProvider.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.List;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.table.descriptors.TableDescriptor;
-
-
-/**
- * Provider to create a list of {@link TableDescriptor} objects to describe one or more Samza tables. This is the
- * mechanism for providing table support for Samza low level API.
- *
- * Developers writing Samza jobs using Samza table(s) should describe the table(s) by implementing
- * TableDescriptorsProvider.
- *
- * Typical user code using Samza tables should look like the following:
- *
- * <pre>
- * {@code
- * public class SampleTableDescriptorsProvider implements TableDescriptorsProvider {
- *   private ReadableTable<String, Long> remoteTable;
- *   private ReadWriteTable<String, String> localTable;
- *
- *   {@code @Override}
- *   public List<TableDescriptor> getTableDescriptors() {
- *     List<TableDescriptor> tableDescriptors = new ArrayList<>();
- *     final TableReadFunction readRemoteTableFn = new MyStoreReadFunction();
- *     tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
- *       .withReadFunction(readRemoteTableFn);
- *
- *     tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new LongSerde(), new StringSerde<>()))
- *       .withBlockSize(4096)
- *       .withConfig("some-key", "some-value");
- *     return tableDescriptors;
- *   }
- * }
- * }
- * </pre>
- *
- * [TODO:SAMZA-1772] will complete the work of introducing low-level Table API. Until then, Table API in low-level
- * could be used by generating configs from TableDescriptorsProvider (sample code below) through config rewriter.
- *
- * <pre>
- * {@code
- * private Map<String, String> generateTableConfigs(Config config) {
- *   String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
- *   if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
- *      // tableDescriptorsProviderClass is not configured
- *      return config;
- *   }
- *
- *   try {
- *      if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
- *         LOG.warn("TableDescriptorsProvider class {} does not implement TableDescriptosProvider.",
- *            tableDescriptorsProviderClassName);
- *         return config;
- *      }
- *
- *      TableDescriptorsProvider tableDescriptorsProvider =
- *          Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
- *      List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
- *      return TableConfigGenerator.generateConfigsForTableDescs(tableDescs);
- *   } catch (Exception e) {
- *      throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
- *          tableDescriptorsProviderClassName), e);
- *   }
- * }
- * }
- * </pre>
- */
-@InterfaceStability.Unstable
-public interface TableDescriptorsProvider {
-  /**
-   * Constructs instances of the table descriptors
-   * @param config the job config
-   * @return list of table descriptors
-   */
-  List<TableDescriptor> getTableDescriptors(Config config);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
index fc2ca8e..53e4246 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/DslConverter.java
@@ -30,6 +30,7 @@ public interface DslConverter {
 
   /**
    * Convert the dsl into the Calcite logical plan.
+   * @param dsl String containing one or more dsl queries.
    * @return List of Root nodes of the Calcite logical plan.
    * If DSL represents multiple SQL statements. You might return root nodes one for each SQL statement.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
index 50001c6..3046615 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaSqlJavaTypeFactoryImpl.java
@@ -45,15 +45,15 @@ public class SamzaSqlJavaTypeFactoryImpl
 
   @Override
   public RelDataType toSql(RelDataType type) {
-    return toSql(this, type);
+    return convertToSql(this, type);
   }
 
-  /** Converts a type in Java format to a SQL-oriented type. */
-  public static RelDataType toSql(final RelDataTypeFactory typeFactory,
+  // Converts a type in Java format to a SQL-oriented type.
+  private static RelDataType convertToSql(final RelDataTypeFactory typeFactory,
       RelDataType type) {
     if (type instanceof RelRecordType) {
       return typeFactory.createStructType(
-          Lists.transform(type.getFieldList(), a0 -> toSql(typeFactory, a0.getType())),
+          Lists.transform(type.getFieldList(), a0 -> convertToSql(typeFactory, a0.getType())),
           type.getFieldNames());
     }
     if (type instanceof JavaType) {

http://git-wip-us.apache.org/repos/asf/samza/blob/de91463e/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
deleted file mode 100644
index f4c05e4..0000000
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.table;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.ConfigRewriter;
-import org.apache.samza.config.JavaStorageConfig;
-import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.LongSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
-import org.apache.samza.storage.kv.descriptors.RocksDbTableProviderFactory;
-import org.apache.samza.table.TableConfigGenerator;
-import org.apache.samza.table.TableDescriptorsProvider;
-import org.apache.samza.table.remote.descriptors.RemoteTableDescriptor;
-import org.apache.samza.table.remote.descriptors.RemoteTableProviderFactory;
-import org.apache.samza.table.remote.TableReadFunction;
-import org.apache.samza.util.RateLimiter;
-import org.apache.samza.util.Util;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Table descriptors provider tests for both remote and local tables
- */
-public class TestTableDescriptorsProvider {
-
-  @Test
-  public void testWithNoConfiguredTableDescriptorProviderClass() throws Exception {
-    Map<String, String> configs = new HashMap<>();
-    String tableRewriterName = "tableRewriter";
-    Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
-    Assert.assertTrue(resultConfig.size() == 0);
-  }
-
-  @Test
-  public void testWithNonTableDescriptorsProviderClass() throws Exception {
-    Map<String, String> configs = new HashMap<>();
-    String tableRewriterName = "tableRewriter";
-    configs.put("tables.descriptors.provider.class", MySampleNonTableDescriptorsProvider.class.getName());
-    Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
-    Assert.assertTrue(resultConfig.size() == 1);
-    JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
-    Assert.assertTrue(tableConfig.getTableIds().size() == 0);
-  }
-
-  @Test
-  public void testWithTableDescriptorsProviderClass() throws Exception {
-    Map<String, String> configs = new HashMap<>();
-    String tableRewriterName = "tableRewriter";
-    String jobName = "test-job";
-    configs.put(JobConfig.JOB_NAME(), jobName);
-    configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName());
-    Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
-    Assert.assertNotNull(resultConfig);
-    Assert.assertTrue(!resultConfig.isEmpty());
-
-    String localTableId = "local-table-1";
-    String remoteTableId = "remote-table-1";
-
-    JavaStorageConfig storageConfig = new JavaStorageConfig(resultConfig);
-    Assert.assertTrue(storageConfig.getStoreNames().size() == 1);
-    Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId);
-    Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId),
-        RocksDbKeyValueStorageEngineFactory.class.getName());
-    Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true);
-    Assert.assertEquals(4, storeConfig.size());
-    Assert.assertEquals(4096, storeConfig.getInt("rocksdb.block.size.bytes"));
-
-    JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
-    Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId),
-        RocksDbTableProviderFactory.class.getName());
-    Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId),
-        RemoteTableProviderFactory.class.getName());
-    Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName());
-    Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName());
-  }
-
-  public static class MySampleNonTableDescriptorsProvider {
-  }
-
-  static class MyReadFunction implements TableReadFunction {
-    @Override
-    public CompletableFuture getAsync(Object key) {
-      return null;
-    }
-
-    @Override
-    public boolean isRetriable(Throwable exception) {
-      return false;
-    }
-  }
-
-  public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
-    @Override
-    public List<TableDescriptor> getTableDescriptors(Config config) {
-      List<TableDescriptor> tableDescriptors = new ArrayList<>();
-      final RateLimiter readRateLimiter = mock(RateLimiter.class);
-      final MyReadFunction readFn = new MyReadFunction();
-
-      tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1", KVSerde.of(new StringSerde(), new LongSerde()))
-          .withReadFunction(readFn)
-          .withRateLimiter(readRateLimiter, null, null));
-      tableDescriptors.add(new RocksDbTableDescriptor("local-table-1", KVSerde.of(new StringSerde(), new StringSerde()))
-          .withBlockSize(4096));
-      return tableDescriptors;
-    }
-  }
-
-  /**
-   * A sample config rewriter to generate table configs. It instantiates the configured tableDescriptorsProvider class
-   * which implements {@link TableDescriptorsProvider} and generates the table configs.
-   */
-  public static class MySampleTableConfigRewriter implements ConfigRewriter {
-
-    @Override
-    public Config rewrite(String name, Config config) {
-      String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
-      if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
-        // tableDescriptorsProviderClass is not configured
-        return config;
-      }
-
-      try {
-        if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
-          // The configured class does not implement TableDescriptorsProvider.
-          return config;
-        }
-
-        TableDescriptorsProvider tableDescriptorsProvider =
-            Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
-        List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
-        return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(config, tableDescs)));
-      } catch (Exception e) {
-        throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
-            tableDescriptorsProviderClassName), e);
-      }
-    }
-  }
-}