You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/07 13:44:46 UTC

[kylin] branch master updated: KYLIN-3485 Make unloading table more flexible

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 050f1c1  KYLIN-3485 Make unloading table more flexible
050f1c1 is described below

commit 050f1c189bf328e72ac690ee7cf4316747102f6a
Author: Rongchuan Jin <ro...@RongchuanJins-MacBook-Pro.local>
AuthorDate: Tue Aug 7 19:56:34 2018 +0800

    KYLIN-3485 Make unloading table more flexible
---
 .../main/java/org/apache/kylin/source/ISource.java |  6 ++++++
 .../apache/kylin/rest/service/TableService.java    | 23 +++++-----------------
 .../org/apache/kylin/source/hive/HiveSource.java   |  5 +++++
 .../org/apache/kylin/source/jdbc/JdbcSource.java   |  5 +++++
 .../org/apache/kylin/source/kafka/KafkaSource.java | 14 +++++++++++++
 5 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 2c5a922..f79d0f0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.source;
 
 import java.io.Closeable;
+import java.io.IOException;
 
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -55,4 +56,9 @@ public interface ISource extends Closeable {
      * For testing purpose.
      */
     ISampleDataDeployer getSampleDataDeployer();
+
+    /**
+     * Unload table.
+     */
+    void unloadTable(String tableName, String project) throws IOException;
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 786daa6..3c661f2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeManager;
@@ -53,7 +54,6 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
@@ -62,11 +62,11 @@ import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -277,8 +277,6 @@ public class TableService extends BasicService {
             return false;
         }
 
-        tableType = desc.getSourceType();
-
         if (!modelService.isTableInModel(desc, project)) {
             removeTableFromProject(tableName, project);
             rtn = true;
@@ -293,20 +291,9 @@ public class TableService extends BasicService {
         metaMgr.removeSourceTable(tableName, project);
 
         // remove streaming info
-        if (tableType == 1) {
-            StreamingConfig config = null;
-            KafkaConfig kafkaConfig = null;
-            try {
-                config = streamingService.getStreamingManager().getStreamingConfig(tableName);
-                kafkaConfig = kafkaConfigService.getKafkaConfig(tableName, project);
-                streamingService.dropStreamingConfig(config, project);
-                kafkaConfigService.dropKafkaConfig(kafkaConfig, project);
-                rtn = true;
-            } catch (Exception e) {
-                rtn = false;
-                logger.error(e.getLocalizedMessage(), e);
-            }
-        }
+        SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
+        ISource source = sourceManager.getCachedSource(desc);
+        source.unloadTable(tableName, project);
         return rtn;
     }
 
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index daf93d3..938114c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -83,6 +83,11 @@ public class HiveSource implements ISource {
     }
 
     @Override
+    public void unloadTable(String tableName, String project) throws IOException {
+
+    }
+
+    @Override
     public void close() throws IOException {
         // not needed
     }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index ae3bbc5..37d119e 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -68,6 +68,11 @@ public class JdbcSource implements ISource {
     }
 
     @Override
+    public void unloadTable(String tableName, String project) throws IOException {
+
+    }
+
+    @Override
     public void close() throws IOException {
         // not needed
     }
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 70d37aa..264f2ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.ISampleDataDeployer;
 import org.apache.kylin.source.ISource;
@@ -247,6 +248,19 @@ public class KafkaSource implements ISource {
     }
 
     @Override
+    public void unloadTable(String tableName, String project) throws IOException {
+        StreamingConfig config;
+        KafkaConfig kafkaConfig;
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
+        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+        config = streamingManager.getStreamingConfig(tableName);
+        kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName);
+        streamingManager.removeStreamingConfig(config);
+        kafkaConfigManager.removeKafkaConfig(kafkaConfig);
+    }
+
+    @Override
     public void close() throws IOException {
         // not needed
     }