You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:39:05 UTC

[beam] 43/50: Cleaning

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 758c1ce371e9dc41018fa5c1668cb24cc1751c99
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:24:11 2018 +0100

    Cleaning
---
 .../translation/batch/DatasetSourceBatch.java      |   3 +-
 .../streaming/DatasetStreamingSource.java          | 172 +--------------------
 2 files changed, 2 insertions(+), 173 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 1ad16eb..f4cd885 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -41,8 +41,7 @@ import org.apache.spark.sql.types.StructType;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
- * class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
 public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
index 8701a83..6947b6d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
@@ -53,8 +53,7 @@ import scala.collection.immutable.Map;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
- * class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
  */
 public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{
 
@@ -196,173 +195,4 @@ public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSu
       reader.close();
     }
   }
-
-  private static class DatasetCatalog<T> extends Catalog {
-
-    TranslationContext context;
-    Source<T> source;
-
-    private DatasetCatalog(TranslationContext context, Source<T> source) {
-      this.context = context;
-      this.source = source;
-    }
-
-    @Override public String currentDatabase() {
-      return null;
-    }
-
-    @Override public void setCurrentDatabase(String dbName) {
-
-    }
-
-    @Override public Dataset<Database> listDatabases() {
-      return null;
-    }
-
-    @Override public Dataset<Table> listTables() {
-      return null;
-    }
-
-    @Override public Dataset<Table> listTables(String dbName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Dataset<Function> listFunctions() {
-      return null;
-    }
-
-    @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Dataset<Column> listColumns(String dbName, String tableName)
-        throws AnalysisException {
-      return null;
-    }
-
-    @Override public Database getDatabase(String dbName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Table getTable(String tableName) throws AnalysisException {
-      return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources",
-          null, true, source, context);
-    }
-
-    @Override public Table getTable(String dbName, String tableName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Function getFunction(String functionName) throws AnalysisException {
-      return null;
-    }
-
-    @Override public Function getFunction(String dbName, String functionName)
-        throws AnalysisException {
-      return null;
-    }
-
-    @Override public boolean databaseExists(String dbName) {
-      return false;
-    }
-
-    @Override public boolean tableExists(String tableName) {
-      return false;
-    }
-
-    @Override public boolean tableExists(String dbName, String tableName) {
-      return false;
-    }
-
-    @Override public boolean functionExists(String functionName) {
-      return false;
-    }
-
-    @Override public boolean functionExists(String dbName, String functionName) {
-      return false;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String path) {
-      return null;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String path, String source) {
-      return null;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String source,
-        Map<String, String> options) {
-      return null;
-    }
-
-    @Override public Dataset<Row> createTable(String tableName, String source, StructType schema,
-        Map<String, String> options) {
-      return null;
-    }
-
-    @Override public boolean dropTempView(String viewName) {
-      return false;
-    }
-
-    @Override public boolean dropGlobalTempView(String viewName) {
-      return false;
-    }
-
-    @Override public void recoverPartitions(String tableName) {
-
-    }
-
-    @Override public boolean isCached(String tableName) {
-      return false;
-    }
-
-    @Override public void cacheTable(String tableName) {
-
-    }
-
-    @Override public void cacheTable(String tableName, StorageLevel storageLevel) {
-
-    }
-
-    @Override public void uncacheTable(String tableName) {
-
-    }
-
-    @Override public void clearCache() {
-
-    }
-
-    @Override public void refreshTable(String tableName) {
-
-    }
-
-    @Override public void refreshByPath(String path) {
-
-    }
-
-    private static class DatasetTable<T> extends Table {
-
-      private Source<T> source;
-      private TranslationContext context;
-
-      public DatasetTable(String name, String database, String description, String tableType,
-          boolean isTemporary, Source<T> source, TranslationContext context) {
-        super(name, database, description, tableType, isTemporary);
-        this.source = source;
-        this.context = context;
-      }
-
-      private Source<T> getSource() {
-        return source;
-      }
-
-      private TranslationContext getContext() {
-        return context;
-      }
-    }
-  }
 }