You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/07 13:44:46 UTC
[kylin] branch master updated: KYLIN-3485 Make unloading table more
flexible
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 050f1c1 KYLIN-3485 Make unloading table more flexible
050f1c1 is described below
commit 050f1c189bf328e72ac690ee7cf4316747102f6a
Author: Rongchuan Jin <ro...@RongchuanJins-MacBook-Pro.local>
AuthorDate: Tue Aug 7 19:56:34 2018 +0800
KYLIN-3485 Make unloading table more flexible
---
.../main/java/org/apache/kylin/source/ISource.java | 6 ++++++
.../apache/kylin/rest/service/TableService.java | 23 +++++-----------------
.../org/apache/kylin/source/hive/HiveSource.java | 5 +++++
.../org/apache/kylin/source/jdbc/JdbcSource.java | 5 +++++
.../org/apache/kylin/source/kafka/KafkaSource.java | 14 +++++++++++++
5 files changed, 35 insertions(+), 18 deletions(-)
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
index 2c5a922..f79d0f0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
+++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java
@@ -19,6 +19,7 @@
package org.apache.kylin.source;
import java.io.Closeable;
+import java.io.IOException;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.TableDesc;
@@ -55,4 +56,9 @@ public interface ISource extends Closeable {
* For testing purpose.
*/
ISampleDataDeployer getSampleDataDeployer();
+
+ /**
+ * Unload table.
+ */
+ void unloadTable(String tableName, String project) throws IOException;
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 786daa6..3c661f2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeManager;
@@ -53,7 +54,6 @@ import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
@@ -62,11 +62,11 @@ import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.response.TableSnapshotResponse;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -277,8 +277,6 @@ public class TableService extends BasicService {
return false;
}
- tableType = desc.getSourceType();
-
if (!modelService.isTableInModel(desc, project)) {
removeTableFromProject(tableName, project);
rtn = true;
@@ -293,20 +291,9 @@ public class TableService extends BasicService {
metaMgr.removeSourceTable(tableName, project);
// remove streaming info
- if (tableType == 1) {
- StreamingConfig config = null;
- KafkaConfig kafkaConfig = null;
- try {
- config = streamingService.getStreamingManager().getStreamingConfig(tableName);
- kafkaConfig = kafkaConfigService.getKafkaConfig(tableName, project);
- streamingService.dropStreamingConfig(config, project);
- kafkaConfigService.dropKafkaConfig(kafkaConfig, project);
- rtn = true;
- } catch (Exception e) {
- rtn = false;
- logger.error(e.getLocalizedMessage(), e);
- }
- }
+ SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv());
+ ISource source = sourceManager.getCachedSource(desc);
+ source.unloadTable(tableName, project);
return rtn;
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
index daf93d3..938114c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java
@@ -83,6 +83,11 @@ public class HiveSource implements ISource {
}
@Override
+ public void unloadTable(String tableName, String project) throws IOException {
+
+ }
+
+ @Override
public void close() throws IOException {
// not needed
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index ae3bbc5..37d119e 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -68,6 +68,11 @@ public class JdbcSource implements ISource {
}
@Override
+ public void unloadTable(String tableName, String project) throws IOException {
+
+ }
+
+ @Override
public void close() throws IOException {
// not needed
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 70d37aa..264f2ce 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISource;
@@ -247,6 +248,19 @@ public class KafkaSource implements ISource {
}
@Override
+ public void unloadTable(String tableName, String project) throws IOException {
+ StreamingConfig config;
+ KafkaConfig kafkaConfig;
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig);
+ KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+ config = streamingManager.getStreamingConfig(tableName);
+ kafkaConfig = kafkaConfigManager.getKafkaConfig(tableName);
+ streamingManager.removeStreamingConfig(config);
+ kafkaConfigManager.removeKafkaConfig(kafkaConfig);
+ }
+
+ @Override
public void close() throws IOException {
// not needed
}