You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/10 08:50:12 UTC
[09/50] [abbrv] kylin git commit: KYLIN-1340 CubeMetaExtractor
support streaming case and skip segments
KYLIN-1340 CubeMetaExtractor support streaming case and skip segments
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c08ded6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c08ded6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c08ded6
Branch: refs/heads/master
Commit: 4c08ded63f78aad93eefa9814d48af2486725967
Parents: 2e1d2f6
Author: honma <ho...@ebay.com>
Authored: Wed Feb 24 15:45:38 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/job/CubeMetaExtractor.java | 327 +++++++++++++++++++
.../kylin/common/persistence/ResourceTool.java | 2 +-
.../engine/streaming/StreamingManager.java | 100 +-----
.../kylin/source/kafka/KafkaConfigManager.java | 47 +--
.../kylin/source/kafka/config/KafkaConfig.java | 4 +-
.../storage/hbase/util/CubeMetaExtractor.java | 284 ----------------
6 files changed, 345 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
new file mode 100644
index 0000000..527ef0a
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * extract cube related info for debugging/distributing purpose
+ * TODO: deal with II case
+ */
+public class CubeMetaExtractor extends AbstractApplication {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc. Default true").create("includeSegments");
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too. Default true").create("includeJobs");
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
+
+ private Options options = null;
+ private KylinConfig kylinConfig;
+ private MetadataManager metadataManager;
+ private ProjectManager projectManager;
+ private HybridManager hybridManager;
+ private CubeManager cubeManager;
+ private StreamingManager streamingManager;
+ private KafkaConfigManager kafkaConfigManager;
+ private CubeDescManager cubeDescManager;
+ private ExecutableDao executableDao;
+ private RealizationRegistry realizationRegistry;
+
+ boolean includeSegments;
+ boolean includeJobs;
+
+ List<String> requiredResources = Lists.newArrayList();
+ List<String> optionalResources = Lists.newArrayList();
+ List<CubeInstance> cubesToTrimAndSave = Lists.newArrayList();//these cubes needs to be saved skipping segments
+
+ public CubeMetaExtractor() {
+ options = new Options();
+
+ OptionGroup realizationOrProject = new OptionGroup();
+ realizationOrProject.addOption(OPTION_CUBE);
+ realizationOrProject.addOption(OPTION_PROJECT);
+ realizationOrProject.addOption(OPTION_HYBRID);
+ realizationOrProject.setRequired(true);
+
+ options.addOptionGroup(realizationOrProject);
+ options.addOption(OPTION_INCLUDE_SEGMENTS);
+ options.addOption(OPTION_INCLUDE_JOB);
+ options.addOption(OPTION_DEST);
+
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
+ includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
+ String dest = null;
+ if (optionsHelper.hasOption(OPTION_DEST)) {
+ dest = optionsHelper.getOptionValue(OPTION_DEST);
+ }
+
+ kylinConfig = KylinConfig.getInstanceFromEnv();
+ metadataManager = MetadataManager.getInstance(kylinConfig);
+ projectManager = ProjectManager.getInstance(kylinConfig);
+ hybridManager = HybridManager.getInstance(kylinConfig);
+ cubeManager = CubeManager.getInstance(kylinConfig);
+ cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+ streamingManager = StreamingManager.getInstance(kylinConfig);
+ kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+ executableDao = ExecutableDao.getInstance(kylinConfig);
+ realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
+
+ if (optionsHelper.hasOption(OPTION_PROJECT)) {
+ ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
+ if (projectInstance == null) {
+ throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
+ }
+ addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
+ List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
+ for (RealizationEntry realizationEntry : realizationEntries) {
+ retrieveResourcePath(getRealization(realizationEntry));
+ }
+ } else if (optionsHelper.hasOption(OPTION_CUBE)) {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+ IRealization realization;
+
+ if ((realization = cubeManager.getRealization(cubeName)) != null) {
+ retrieveResourcePath(realization);
+ } else {
+ throw new IllegalArgumentException("No cube found with name of " + cubeName);
+ }
+ } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
+ String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
+ IRealization realization;
+
+ if ((realization = hybridManager.getRealization(hybridName)) != null) {
+ retrieveResourcePath(realization);
+ } else {
+ throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
+ }
+ }
+
+ executeExtraction(dest);
+ }
+
+ private void executeExtraction(String dest) {
+ logger.info("The resource paths going to be extracted:");
+ for (String s : requiredResources) {
+ logger.info(s + "(required)");
+ }
+ for (String s : optionalResources) {
+ logger.info(s + "(optional)");
+ }
+ for (CubeInstance cube : cubesToTrimAndSave) {
+ logger.info("Cube {} will be trimmed and extracted", cube);
+ }
+
+ if (dest == null) {
+ logger.info("Dest is not set, exit directly without extracting");
+ } else {
+ try {
+ ResourceStore src = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+ ResourceStore dst = ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest));
+
+ for (String path : requiredResources) {
+ ResourceTool.copyR(src, dst, path);
+ }
+
+ for (String path : optionalResources) {
+ try {
+ ResourceTool.copyR(src, dst, path);
+ } catch (Exception e) {
+ logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it.");
+ }
+ }
+
+ for (CubeInstance cube : cubesToTrimAndSave) {
+ CubeInstance trimmedCube = CubeInstance.getCopyOf(cube);
+ trimmedCube.getSegments().clear();
+ trimmedCube.setUuid(cube.getUuid());
+ dst.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER);
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("IOException", e);
+ }
+ }
+ }
+
+ private IRealization getRealization(RealizationEntry realizationEntry) {
+ return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
+ }
+
+ private void dealWithStreaming(CubeInstance cube) {
+ for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
+ if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) {
+ requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName()));
+ requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName()));
+ }
+ }
+ }
+
+ private void retrieveResourcePath(IRealization realization) {
+
+ logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
+
+ if (realization instanceof CubeInstance) {
+ CubeInstance cube = (CubeInstance) realization;
+ String descName = cube.getDescName();
+ CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
+ String modelName = cubeDesc.getModelName();
+ DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
+
+ dealWithStreaming(cube);
+
+ for (String tableName : modelDesc.getAllTables()) {
+ addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
+ addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
+ }
+
+ addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
+ addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
+
+ if (includeSegments) {
+ addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
+ for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+ for (String dictPat : segment.getDictionaryPaths()) {
+ addRequired(requiredResources, dictPat);
+ }
+ for (String snapshotPath : segment.getSnapshotPaths()) {
+ addRequired(requiredResources, snapshotPath);
+ }
+ addRequired(requiredResources, segment.getStatisticsResourcePath());
+
+ if (includeJobs) {
+ String lastJobId = segment.getLastBuildJobID();
+ if (!StringUtils.isEmpty(lastJobId)) {
+ throw new RuntimeException("No job exist for segment :" + segment);
+ } else {
+ try {
+ ExecutablePO executablePO = executableDao.getJob(lastJobId);
+ addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
+ addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
+ for (ExecutablePO task : executablePO.getTasks()) {
+ addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
+ addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
+ }
+ } catch (PersistentException e) {
+ throw new RuntimeException("PersistentException", e);
+ }
+ }
+ } else {
+ logger.info("Job info will not be extracted");
+ }
+ }
+ } else {
+ if (includeJobs) {
+ logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
+ }
+
+ cubesToTrimAndSave.add(cube);
+ }
+ } else if (realization instanceof HybridInstance) {
+ HybridInstance hybridInstance = (HybridInstance) realization;
+ addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
+ for (IRealization iRealization : hybridInstance.getRealizations()) {
+ if (iRealization.getType() != RealizationType.CUBE) {
+ throw new RuntimeException("Hybrid " + iRealization.getName() + " contains non cube child " + iRealization.getName() + " with type " + iRealization.getType());
+ }
+ retrieveResourcePath(iRealization);
+ }
+ } else if (realization instanceof IIInstance) {
+ throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
+ } else {
+ throw new IllegalStateException("Unknown realization type: " + realization.getType());
+ }
+ }
+
+ private void addRequired(List<String> resourcePaths, String record) {
+ logger.info("adding required resource {}", record);
+ resourcePaths.add(record);
+ }
+
+ private void addOptional(List<String> optionalPaths, String record) {
+ logger.info("adding optional resource {}", record);
+ optionalPaths.add(record);
+ }
+
+ public static void main(String[] args) {
+ CubeMetaExtractor extractor = new CubeMetaExtractor();
+ extractor.execute(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 3b8e0c1..489e45a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -115,7 +115,7 @@ public class ResourceTool {
copyR(src, dst, "/");
}
- private static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
+ public static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
ArrayList<String> children = src.listResources(path);
// case of resource (not a folder)
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..e0b086d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -34,24 +34,14 @@
package org.apache.kylin.engine.streaming;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
@@ -60,13 +50,6 @@ import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
/**
*/
public class StreamingManager {
@@ -121,18 +104,6 @@ public class StreamingManager {
}
}
- private String formatStreamingConfigPath(String name) {
- return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
- }
-
- private String formatStreamingOutputPath(String streaming, int partition) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
- }
-
- private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
- return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
- }
-
public StreamingConfig getStreamingConfig(String name) {
return streamingMap.get(name);
}
@@ -214,77 +185,12 @@ public class StreamingManager {
if (streamingMap.containsKey(streamingConfig.getName()))
throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
- String path = formatStreamingConfigPath(streamingConfig.getName());
+ String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
streamingMap.put(streamingConfig.getName(), streamingConfig);
return streamingConfig;
}
- public long getOffset(String streaming, int shard) {
- final String resPath = formatStreamingOutputPath(streaming, shard);
- InputStream inputStream = null;
- try {
- final RawResource res = getStore().getResource(resPath);
- if (res == null) {
- return 0;
- } else {
- inputStream = res.inputStream;
- final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
- return Long.parseLong(br.readLine());
- }
- } catch (Exception e) {
- logger.error("error get offset, path:" + resPath, e);
- throw new RuntimeException("error get offset, path:" + resPath, e);
- } finally {
- IOUtils.closeQuietly(inputStream);
- }
- }
-
- public void updateOffset(String streaming, int shard, long offset) {
- Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
- final String resPath = formatStreamingOutputPath(streaming, shard);
- try {
- getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()), getStore().getResourceTimestamp(resPath));
- } catch (IOException e) {
- logger.error("error update offset, path:" + resPath, e);
- throw new RuntimeException("error update offset, path:" + resPath, e);
- }
- }
-
- public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
- Collections.sort(partitions);
- final String resPath = formatStreamingOutputPath(streaming, partitions);
- InputStream inputStream = null;
- try {
- RawResource res = getStore().getResource(resPath);
- if (res == null)
- return Collections.emptyMap();
-
- inputStream = res.inputStream;
- final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
- return result;
- } catch (IOException e) {
- logger.error("error get offset, path:" + resPath, e);
- throw new RuntimeException("error get offset, path:" + resPath, e);
- } finally {
- IOUtils.closeQuietly(inputStream);
- }
- }
-
- public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
- List<Integer> partitions = Lists.newLinkedList(offset.keySet());
- Collections.sort(partitions);
- final String resPath = formatStreamingOutputPath(streaming, partitions);
- try {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- mapper.writeValue(baos, offset);
- getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
- } catch (IOException e) {
- logger.error("error update offset, path:" + resPath, e);
- throw new RuntimeException("error update offset, path:" + resPath, e);
- }
- }
-
private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
ResourceStore store = getStore();
StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
@@ -324,8 +230,4 @@ public class StreamingManager {
logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
}
-
- private final ObjectMapper mapper = new ObjectMapper();
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index ac20fc3..1d07f23 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -36,7 +36,6 @@ package org.apache.kylin.source.kafka;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,11 +51,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
/**
*/
public class KafkaConfigManager {
@@ -87,7 +81,7 @@ public class KafkaConfigManager {
return ResourceStore.getStore(this.config);
}
- public static KafkaConfigManager getInstance(KylinConfig config){
+ public static KafkaConfigManager getInstance(KylinConfig config) {
KafkaConfigManager r = CACHE.get(config);
if (r != null) {
return r;
@@ -98,16 +92,16 @@ public class KafkaConfigManager {
if (r != null) {
return r;
}
- try{
- r = new KafkaConfigManager(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one KafkaConfigManager singleton exist");
+ try {
+ r = new KafkaConfigManager(config);
+ CACHE.put(config, r);
+ if (CACHE.size() > 1) {
+ logger.warn("More than one KafkaConfigManager singleton exist");
+ }
+ return r;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
}
- return r;
- } catch (IOException e) {
- throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
- }
}
}
@@ -125,7 +119,7 @@ public class KafkaConfigManager {
public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
// Save Source
- String path = KafkaConfig.getKafkaResourcePath(name);
+ String path = KafkaConfig.concatResourcePath(name);
// Reload the KafkaConfig
KafkaConfig ndesc = loadKafkaConfigAt(path);
@@ -135,14 +129,6 @@ public class KafkaConfigManager {
return ndesc;
}
- private boolean checkExistence(String name) {
- return true;
- }
-
- private String formatStreamingConfigPath(String name) {
- return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json";
- }
-
public boolean createKafkaConfig(String name, KafkaConfig config) {
if (config == null || StringUtils.isEmpty(config.getName())) {
@@ -152,7 +138,7 @@ public class KafkaConfigManager {
if (kafkaMap.containsKey(config.getName()))
throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
try {
- getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+ getStore().putResource(KafkaConfig.concatResourcePath(name), config, KafkaConfig.SERIALIZER);
kafkaMap.put(config.getName(), config);
return true;
} catch (IOException e) {
@@ -185,7 +171,7 @@ public class KafkaConfigManager {
private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
ResourceStore store = getStore();
- KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+ KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, KAFKA_SERIALIZER);
if (StringUtils.isBlank(kafkaConfig.getName())) {
throw new IllegalStateException("KafkaConfig name must not be blank");
@@ -193,7 +179,6 @@ public class KafkaConfigManager {
return kafkaConfig;
}
-
public KafkaConfig getKafkaConfig(String name) {
return kafkaMap.get(name);
}
@@ -203,7 +188,7 @@ public class KafkaConfigManager {
throw new IllegalArgumentException();
}
- String path = formatStreamingConfigPath(kafkaConfig.getName());
+ String path = KafkaConfig.concatResourcePath(kafkaConfig.getName());
getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
}
@@ -214,7 +199,6 @@ public class KafkaConfigManager {
kafkaMap.remove(kafkaConfig.getName());
}
-
private void reloadAllKafkaConfig() throws IOException {
ResourceStore store = getStore();
logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
@@ -245,7 +229,4 @@ public class KafkaConfigManager {
logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
}
- private final ObjectMapper mapper = new ObjectMapper();
- private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 100ca2d..1dce844 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -85,10 +85,10 @@ public class KafkaConfig extends RootPersistentEntity {
private String parserProperties;
public String getResourcePath() {
- return getKafkaResourcePath(name);
+ return concatResourcePath(name);
}
- public static String getKafkaResourcePath(String streamingName) {
+ public static String concatResourcePath(String streamingName) {
return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
deleted file mode 100644
index 680dff8..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * extract cube related info for debugging/distributing purpose
- * TODO: deal with II case, deal with Streaming case
- */
-public class CubeMetaExtractor extends AbstractApplication {
-
- private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
- @SuppressWarnings("static-access")
- private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
- @SuppressWarnings("static-access")
- private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc.").create("includeSegments");
- @SuppressWarnings("static-access")
- private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too").create("includeJobs");
-
- @SuppressWarnings("static-access")
- private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
-
- private Options options = null;
- private KylinConfig kylinConfig;
- private MetadataManager metadataManager;
- private ProjectManager projectManager;
- private HybridManager hybridManager;
- private CubeManager cubeManager;
- private CubeDescManager cubeDescManager;
- private IIManager iiManager;
- private IIDescManager iiDescManager;
- private ExecutableDao executableDao;
- RealizationRegistry realizationRegistry;
-
- public CubeMetaExtractor() {
- options = new Options();
-
- OptionGroup realizationOrProject = new OptionGroup();
- realizationOrProject.addOption(OPTION_CUBE);
- realizationOrProject.addOption(OPTION_PROJECT);
- realizationOrProject.addOption(OPTION_HYBRID);
- realizationOrProject.setRequired(true);
-
- options.addOptionGroup(realizationOrProject);
- options.addOption(OPTION_INCLUDE_SEGMENTS);
- options.addOption(OPTION_INCLUDE_JOB);
- options.addOption(OPTION_DEST);
-
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
- boolean includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
- boolean includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
- String dest = null;
- if (optionsHelper.hasOption(OPTION_DEST)) {
- dest = optionsHelper.getOptionValue(OPTION_DEST);
- }
-
- if (!includeSegments) {
- throw new RuntimeException("Does not support skip segments for now");
- }
-
- kylinConfig = KylinConfig.getInstanceFromEnv();
- metadataManager = MetadataManager.getInstance(kylinConfig);
- projectManager = ProjectManager.getInstance(kylinConfig);
- hybridManager = HybridManager.getInstance(kylinConfig);
- cubeManager = CubeManager.getInstance(kylinConfig);
- cubeDescManager = CubeDescManager.getInstance(kylinConfig);
- iiManager = IIManager.getInstance(kylinConfig);
- iiDescManager = IIDescManager.getInstance(kylinConfig);
- executableDao = ExecutableDao.getInstance(kylinConfig);
- realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
-
- List<String> requiredResources = Lists.newArrayList();
- List<String> optionalResources = Lists.newArrayList();
-
- if (optionsHelper.hasOption(OPTION_PROJECT)) {
- ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
- if (projectInstance == null) {
- throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
- }
- addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
- List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
- for (RealizationEntry realizationEntry : realizationEntries) {
- retrieveResourcePath(getRealization(realizationEntry), includeSegments, includeJobs, requiredResources, optionalResources);
- }
- } else if (optionsHelper.hasOption(OPTION_CUBE)) {
- String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
- IRealization realization;
-
- if ((realization = cubeManager.getRealization(cubeName)) != null) {
- retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
- } else {
- throw new IllegalArgumentException("No cube found with name of " + cubeName);
- }
- } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
- String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
- IRealization realization;
-
- if ((realization = hybridManager.getRealization(hybridName)) != null) {
- retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
- } else {
- throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
- }
- }
-
- executeExtraction(requiredResources, optionalResources, dest);
- }
-
- private void executeExtraction(List<String> requiredPaths, List<String> optionalPaths, String dest) {
- logger.info("The resource paths going to be extracted:");
- for (String s : requiredPaths) {
- logger.info(s + "(required)");
- }
- for (String s : optionalPaths) {
- logger.info(s + "(optional)");
- }
-
- if (dest == null) {
- logger.info("Dest is not set, exit directly without extracting");
- } else {
- try {
- ResourceTool.copy(KylinConfig.getInstanceFromEnv(), KylinConfig.createInstanceFromUri(dest));
- } catch (IOException e) {
- throw new RuntimeException("IOException", e);
- }
- }
- }
-
- private IRealization getRealization(RealizationEntry realizationEntry) {
- return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
- }
-
- private void retrieveResourcePath(IRealization realization, boolean includeSegments, boolean includeJobs, List<String> requiredResources, List<String> optionalResources) {
-
- logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
-
- if (realization instanceof CubeInstance) {
- CubeInstance cube = (CubeInstance) realization;
- String descName = cube.getDescName();
- CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
- String modelName = cubeDesc.getModelName();
- DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
-
- for (String tableName : modelDesc.getAllTables()) {
- addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
- addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
- }
-
- addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
- addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
-
- if (includeSegments) {
- addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
- for (CubeSegment segment : cube.getSegments()) {
- for (String dictPat : segment.getDictionaryPaths()) {
- addRequired(requiredResources, dictPat);
- }
- for (String snapshotPath : segment.getSnapshotPaths()) {
- addRequired(requiredResources, snapshotPath);
- }
- addRequired(requiredResources, segment.getStatisticsResourcePath());
-
- if (includeJobs) {
- String lastJobId = segment.getLastBuildJobID();
- if (!StringUtils.isEmpty(lastJobId)) {
- logger.warn("No job exist for segment {}", segment);
- } else {
- try {
- ExecutablePO executablePO = executableDao.getJob(lastJobId);
- addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
- addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
- for (ExecutablePO task : executablePO.getTasks()) {
- addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
- addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
- }
- } catch (PersistentException e) {
- throw new RuntimeException("PersistentException", e);
- }
- }
- } else {
- logger.info("Job info will not be extracted");
- }
- }
- } else {
- if (includeJobs) {
- logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
- }
-
- throw new IllegalStateException("Does not support skip segments now");
- }
- } else if (realization instanceof HybridInstance) {
- HybridInstance hybridInstance = (HybridInstance) realization;
- addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
- for (IRealization iRealization : hybridInstance.getRealizations()) {
- retrieveResourcePath(iRealization, includeSegments, includeJobs, requiredResources, optionalResources);
- }
- } else if (realization instanceof IIInstance) {
- throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
- } else {
- throw new IllegalStateException("Unknown realization type: " + realization.getType());
- }
- }
-
- private void addRequired(List<String> resourcePaths, String record) {
- logger.info("adding required resource {}", record);
- resourcePaths.add(record);
- }
-
- private void addOptional(List<String> optionalPaths, String record) {
- logger.info("adding optional resource {}", record);
- optionalPaths.add(record);
- }
-
- public static void main(String[] args) {
- CubeMetaExtractor extractor = new CubeMetaExtractor();
- extractor.execute(args);
- }
-}