You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:39:05 UTC
[beam] 43/50: Cleaning
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 758c1ce371e9dc41018fa5c1668cb24cc1751c99
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Dec 28 10:24:11 2018 +0100
Cleaning
---
.../translation/batch/DatasetSourceBatch.java | 3 +-
.../streaming/DatasetStreamingSource.java | 172 +--------------------
2 files changed, 2 insertions(+), 173 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index 1ad16eb..f4cd885 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -41,8 +41,7 @@ import org.apache.spark.sql.types.StructType;
/**
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
- * class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
*/
public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
index 8701a83..6947b6d 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
@@ -53,8 +53,7 @@ import scala.collection.immutable.Map;
/**
* This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming
- * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. This
- * class is just a mix-in.
+ * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}.
*/
public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSupport{
@@ -196,173 +195,4 @@ public class DatasetStreamingSource<T> implements DataSourceV2, MicroBatchReadSu
reader.close();
}
}
-
- private static class DatasetCatalog<T> extends Catalog {
-
- TranslationContext context;
- Source<T> source;
-
- private DatasetCatalog(TranslationContext context, Source<T> source) {
- this.context = context;
- this.source = source;
- }
-
- @Override public String currentDatabase() {
- return null;
- }
-
- @Override public void setCurrentDatabase(String dbName) {
-
- }
-
- @Override public Dataset<Database> listDatabases() {
- return null;
- }
-
- @Override public Dataset<Table> listTables() {
- return null;
- }
-
- @Override public Dataset<Table> listTables(String dbName) throws AnalysisException {
- return null;
- }
-
- @Override public Dataset<Function> listFunctions() {
- return null;
- }
-
- @Override public Dataset<Function> listFunctions(String dbName) throws AnalysisException {
- return null;
- }
-
- @Override public Dataset<Column> listColumns(String tableName) throws AnalysisException {
- return null;
- }
-
- @Override public Dataset<Column> listColumns(String dbName, String tableName)
- throws AnalysisException {
- return null;
- }
-
- @Override public Database getDatabase(String dbName) throws AnalysisException {
- return null;
- }
-
- @Override public Table getTable(String tableName) throws AnalysisException {
- return new DatasetTable<>("beam", "beaam", "beam fake table to wire up with Beam sources",
- null, true, source, context);
- }
-
- @Override public Table getTable(String dbName, String tableName) throws AnalysisException {
- return null;
- }
-
- @Override public Function getFunction(String functionName) throws AnalysisException {
- return null;
- }
-
- @Override public Function getFunction(String dbName, String functionName)
- throws AnalysisException {
- return null;
- }
-
- @Override public boolean databaseExists(String dbName) {
- return false;
- }
-
- @Override public boolean tableExists(String tableName) {
- return false;
- }
-
- @Override public boolean tableExists(String dbName, String tableName) {
- return false;
- }
-
- @Override public boolean functionExists(String functionName) {
- return false;
- }
-
- @Override public boolean functionExists(String dbName, String functionName) {
- return false;
- }
-
- @Override public Dataset<Row> createTable(String tableName, String path) {
- return null;
- }
-
- @Override public Dataset<Row> createTable(String tableName, String path, String source) {
- return null;
- }
-
- @Override public Dataset<Row> createTable(String tableName, String source,
- Map<String, String> options) {
- return null;
- }
-
- @Override public Dataset<Row> createTable(String tableName, String source, StructType schema,
- Map<String, String> options) {
- return null;
- }
-
- @Override public boolean dropTempView(String viewName) {
- return false;
- }
-
- @Override public boolean dropGlobalTempView(String viewName) {
- return false;
- }
-
- @Override public void recoverPartitions(String tableName) {
-
- }
-
- @Override public boolean isCached(String tableName) {
- return false;
- }
-
- @Override public void cacheTable(String tableName) {
-
- }
-
- @Override public void cacheTable(String tableName, StorageLevel storageLevel) {
-
- }
-
- @Override public void uncacheTable(String tableName) {
-
- }
-
- @Override public void clearCache() {
-
- }
-
- @Override public void refreshTable(String tableName) {
-
- }
-
- @Override public void refreshByPath(String path) {
-
- }
-
- private static class DatasetTable<T> extends Table {
-
- private Source<T> source;
- private TranslationContext context;
-
- public DatasetTable(String name, String database, String description, String tableType,
- boolean isTemporary, Source<T> source, TranslationContext context) {
- super(name, database, description, tableType, isTemporary);
- this.source = source;
- this.context = context;
- }
-
- private Source<T> getSource() {
- return source;
- }
-
- private TranslationContext getContext() {
- return context;
- }
- }
- }
}