You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/03/10 08:50:12 UTC

[09/50] [abbrv] kylin git commit: KYLIN-1340 CubeMetaExtractor support streaming case and skip segments

KYLIN-1340 CubeMetaExtractor support streaming case and skip segments


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c08ded6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c08ded6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c08ded6

Branch: refs/heads/master
Commit: 4c08ded63f78aad93eefa9814d48af2486725967
Parents: 2e1d2f6
Author: honma <ho...@ebay.com>
Authored: Wed Feb 24 15:45:38 2016 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Feb 26 17:54:37 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/CubeMetaExtractor.java | 327 +++++++++++++++++++
 .../kylin/common/persistence/ResourceTool.java  |   2 +-
 .../engine/streaming/StreamingManager.java      | 100 +-----
 .../kylin/source/kafka/KafkaConfigManager.java  |  47 +--
 .../kylin/source/kafka/config/KafkaConfig.java  |   4 +-
 .../storage/hbase/util/CubeMetaExtractor.java   | 284 ----------------
 6 files changed, 345 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
new file mode 100644
index 0000000..527ef0a
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/CubeMetaExtractor.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * extract cube related info for debugging/distributing purpose
+ * TODO: deal with II case
+ */
+public class CubeMetaExtractor extends AbstractApplication {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc. Default true").create("includeSegments");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too. Default true").create("includeJobs");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
+
+    private Options options = null;
+    private KylinConfig kylinConfig;
+    private MetadataManager metadataManager;
+    private ProjectManager projectManager;
+    private HybridManager hybridManager;
+    private CubeManager cubeManager;
+    private StreamingManager streamingManager;
+    private KafkaConfigManager kafkaConfigManager;
+    private CubeDescManager cubeDescManager;
+    private ExecutableDao executableDao;
+    private RealizationRegistry realizationRegistry;
+
+    boolean includeSegments;
+    boolean includeJobs;
+
+    List<String> requiredResources = Lists.newArrayList();
+    List<String> optionalResources = Lists.newArrayList();
+    List<CubeInstance> cubesToTrimAndSave = Lists.newArrayList();//these cubes needs to be saved skipping segments
+
+    public CubeMetaExtractor() {
+        options = new Options();
+
+        OptionGroup realizationOrProject = new OptionGroup();
+        realizationOrProject.addOption(OPTION_CUBE);
+        realizationOrProject.addOption(OPTION_PROJECT);
+        realizationOrProject.addOption(OPTION_HYBRID);
+        realizationOrProject.setRequired(true);
+
+        options.addOptionGroup(realizationOrProject);
+        options.addOption(OPTION_INCLUDE_SEGMENTS);
+        options.addOption(OPTION_INCLUDE_JOB);
+        options.addOption(OPTION_DEST);
+
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
+        includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
+        String dest = null;
+        if (optionsHelper.hasOption(OPTION_DEST)) {
+            dest = optionsHelper.getOptionValue(OPTION_DEST);
+        }
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        metadataManager = MetadataManager.getInstance(kylinConfig);
+        projectManager = ProjectManager.getInstance(kylinConfig);
+        hybridManager = HybridManager.getInstance(kylinConfig);
+        cubeManager = CubeManager.getInstance(kylinConfig);
+        cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+        streamingManager = StreamingManager.getInstance(kylinConfig);
+        kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
+        executableDao = ExecutableDao.getInstance(kylinConfig);
+        realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
+
+        if (optionsHelper.hasOption(OPTION_PROJECT)) {
+            ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
+            if (projectInstance == null) {
+                throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
+            }
+            addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
+            List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
+            for (RealizationEntry realizationEntry : realizationEntries) {
+                retrieveResourcePath(getRealization(realizationEntry));
+            }
+        } else if (optionsHelper.hasOption(OPTION_CUBE)) {
+            String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+            IRealization realization;
+
+            if ((realization = cubeManager.getRealization(cubeName)) != null) {
+                retrieveResourcePath(realization);
+            } else {
+                throw new IllegalArgumentException("No cube found with name of " + cubeName);
+            }
+        } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
+            String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
+            IRealization realization;
+
+            if ((realization = hybridManager.getRealization(hybridName)) != null) {
+                retrieveResourcePath(realization);
+            } else {
+                throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
+            }
+        }
+
+        executeExtraction(dest);
+    }
+
+    private void executeExtraction(String dest) {
+        logger.info("The resource paths going to be extracted:");
+        for (String s : requiredResources) {
+            logger.info(s + "(required)");
+        }
+        for (String s : optionalResources) {
+            logger.info(s + "(optional)");
+        }
+        for (CubeInstance cube : cubesToTrimAndSave) {
+            logger.info("Cube {} will be trimmed and extracted", cube);
+        }
+
+        if (dest == null) {
+            logger.info("Dest is not set, exit directly without extracting");
+        } else {
+            try {
+                ResourceStore src = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+                ResourceStore dst = ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest));
+
+                for (String path : requiredResources) {
+                    ResourceTool.copyR(src, dst, path);
+                }
+
+                for (String path : optionalResources) {
+                    try {
+                        ResourceTool.copyR(src, dst, path);
+                    } catch (Exception e) {
+                        logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it.");
+                    }
+                }
+
+                for (CubeInstance cube : cubesToTrimAndSave) {
+                    CubeInstance trimmedCube = CubeInstance.getCopyOf(cube);
+                    trimmedCube.getSegments().clear();
+                    trimmedCube.setUuid(cube.getUuid());
+                    dst.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER);
+                }
+
+            } catch (IOException e) {
+                throw new RuntimeException("IOException", e);
+            }
+        }
+    }
+
+    private IRealization getRealization(RealizationEntry realizationEntry) {
+        return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
+    }
+
+    private void dealWithStreaming(CubeInstance cube) {
+        for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
+            if (streamingConfig.getCubeName() != null && streamingConfig.getCubeName().equalsIgnoreCase(cube.getName())) {
+                requiredResources.add(StreamingConfig.concatResourcePath(streamingConfig.getName()));
+                requiredResources.add(KafkaConfig.concatResourcePath(streamingConfig.getName()));
+            }
+        }
+    }
+
+    private void retrieveResourcePath(IRealization realization) {
+
+        logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
+
+        if (realization instanceof CubeInstance) {
+            CubeInstance cube = (CubeInstance) realization;
+            String descName = cube.getDescName();
+            CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
+            String modelName = cubeDesc.getModelName();
+            DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
+
+            dealWithStreaming(cube);
+
+            for (String tableName : modelDesc.getAllTables()) {
+                addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
+                addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
+            }
+
+            addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
+            addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
+
+            if (includeSegments) {
+                addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
+                for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+                    for (String dictPat : segment.getDictionaryPaths()) {
+                        addRequired(requiredResources, dictPat);
+                    }
+                    for (String snapshotPath : segment.getSnapshotPaths()) {
+                        addRequired(requiredResources, snapshotPath);
+                    }
+                    addRequired(requiredResources, segment.getStatisticsResourcePath());
+
+                    if (includeJobs) {
+                        String lastJobId = segment.getLastBuildJobID();
+                        if (!StringUtils.isEmpty(lastJobId)) {
+                            throw new RuntimeException("No job exist for segment :" + segment);
+                        } else {
+                            try {
+                                ExecutablePO executablePO = executableDao.getJob(lastJobId);
+                                addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
+                                addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
+                                for (ExecutablePO task : executablePO.getTasks()) {
+                                    addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
+                                    addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
+                                }
+                            } catch (PersistentException e) {
+                                throw new RuntimeException("PersistentException", e);
+                            }
+                        }
+                    } else {
+                        logger.info("Job info will not be extracted");
+                    }
+                }
+            } else {
+                if (includeJobs) {
+                    logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
+                }
+
+                cubesToTrimAndSave.add(cube);
+            }
+        } else if (realization instanceof HybridInstance) {
+            HybridInstance hybridInstance = (HybridInstance) realization;
+            addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
+            for (IRealization iRealization : hybridInstance.getRealizations()) {
+                if (iRealization.getType() != RealizationType.CUBE) {
+                    throw new RuntimeException("Hybrid " + iRealization.getName() + " contains non cube child " + iRealization.getName() + " with type " + iRealization.getType());
+                }
+                retrieveResourcePath(iRealization);
+            }
+        } else if (realization instanceof IIInstance) {
+            throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
+        } else {
+            throw new IllegalStateException("Unknown realization type: " + realization.getType());
+        }
+    }
+
+    private void addRequired(List<String> resourcePaths, String record) {
+        logger.info("adding required resource {}", record);
+        resourcePaths.add(record);
+    }
+
+    private void addOptional(List<String> optionalPaths, String record) {
+        logger.info("adding optional resource {}", record);
+        optionalPaths.add(record);
+    }
+
+    public static void main(String[] args) {
+        CubeMetaExtractor extractor = new CubeMetaExtractor();
+        extractor.execute(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 3b8e0c1..489e45a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -115,7 +115,7 @@ public class ResourceTool {
         copyR(src, dst, "/");
     }
 
-    private static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
+    public static void copyR(ResourceStore src, ResourceStore dst, String path) throws IOException {
         ArrayList<String> children = src.listResources(path);
 
         // case of resource (not a folder)

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index af04a11..e0b086d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -34,24 +34,14 @@
 
 package org.apache.kylin.engine.streaming;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
@@ -60,13 +50,6 @@ import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 /**
  */
 public class StreamingManager {
@@ -121,18 +104,6 @@ public class StreamingManager {
         }
     }
 
-    private String formatStreamingConfigPath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, int partition) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
-    }
-
     public StreamingConfig getStreamingConfig(String name) {
         return streamingMap.get(name);
     }
@@ -214,77 +185,12 @@ public class StreamingManager {
         if (streamingMap.containsKey(streamingConfig.getName()))
             throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
 
-        String path = formatStreamingConfigPath(streamingConfig.getName());
+        String path  = StreamingConfig.concatResourcePath(streamingConfig.getName());
         getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
         streamingMap.put(streamingConfig.getName(), streamingConfig);
         return streamingConfig;
     }
 
-    public long getOffset(String streaming, int shard) {
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        InputStream inputStream = null; 
-        try {
-            final RawResource res = getStore().getResource(resPath);
-            if (res == null) {
-                return 0;
-            } else {
-            	inputStream = res.inputStream;
-                final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
-                return Long.parseLong(br.readLine());
-            }
-        } catch (Exception e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        } finally {
-        	IOUtils.closeQuietly(inputStream);
-        }
-    }
-
-    public void updateOffset(String streaming, int shard, long offset) {
-        Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        try {
-            getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()), getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
-    public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions) {
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        InputStream inputStream = null;
-        try {
-        	RawResource res = getStore().getResource(resPath);
-        	if (res == null)
-        		return Collections.emptyMap();
-        	
-        	inputStream = res.inputStream;
-            final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
-            return result;
-        } catch (IOException e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        } finally {
-        	IOUtils.closeQuietly(inputStream);
-        }
-    }
-
-    public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
-        List<Integer> partitions = Lists.newLinkedList(offset.keySet());
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            mapper.writeValue(baos, offset);
-            getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()), getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
     private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
         ResourceStore store = getStore();
         StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
@@ -324,8 +230,4 @@ public class StreamingManager {
 
         logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
     }
-
-    private final ObjectMapper mapper = new ObjectMapper();
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index ac20fc3..1d07f23 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -36,7 +36,6 @@ package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -52,11 +51,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
 /**
  */
 public class KafkaConfigManager {
@@ -87,7 +81,7 @@ public class KafkaConfigManager {
         return ResourceStore.getStore(this.config);
     }
 
-    public static KafkaConfigManager getInstance(KylinConfig config){
+    public static KafkaConfigManager getInstance(KylinConfig config) {
         KafkaConfigManager r = CACHE.get(config);
         if (r != null) {
             return r;
@@ -98,16 +92,16 @@ public class KafkaConfigManager {
             if (r != null) {
                 return r;
             }
-            try{
-            r = new KafkaConfigManager(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one KafkaConfigManager singleton exist");
+            try {
+                r = new KafkaConfigManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one KafkaConfigManager singleton exist");
+                }
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
             }
-            return r;
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to init KafkaConfigManager from " + config, e);
-        }
         }
     }
 
@@ -125,7 +119,7 @@ public class KafkaConfigManager {
     public KafkaConfig reloadKafkaConfigLocal(String name) throws IOException {
 
         // Save Source
-        String path = KafkaConfig.getKafkaResourcePath(name);
+        String path = KafkaConfig.concatResourcePath(name);
 
         // Reload the KafkaConfig
         KafkaConfig ndesc = loadKafkaConfigAt(path);
@@ -135,14 +129,6 @@ public class KafkaConfigManager {
         return ndesc;
     }
 
-    private boolean checkExistence(String name) {
-        return true;
-    }
-
-    private String formatStreamingConfigPath(String name) {
-        return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
     public boolean createKafkaConfig(String name, KafkaConfig config) {
 
         if (config == null || StringUtils.isEmpty(config.getName())) {
@@ -152,7 +138,7 @@ public class KafkaConfigManager {
         if (kafkaMap.containsKey(config.getName()))
             throw new IllegalArgumentException("KafkaConfig '" + config.getName() + "' already exists");
         try {
-            getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+            getStore().putResource(KafkaConfig.concatResourcePath(name), config, KafkaConfig.SERIALIZER);
             kafkaMap.put(config.getName(), config);
             return true;
         } catch (IOException e) {
@@ -185,7 +171,7 @@ public class KafkaConfigManager {
 
     private KafkaConfig loadKafkaConfigAt(String path) throws IOException {
         ResourceStore store = getStore();
-        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class,KAFKA_SERIALIZER );
+        KafkaConfig kafkaConfig = store.getResource(path, KafkaConfig.class, KAFKA_SERIALIZER);
 
         if (StringUtils.isBlank(kafkaConfig.getName())) {
             throw new IllegalStateException("KafkaConfig name must not be blank");
@@ -193,7 +179,6 @@ public class KafkaConfigManager {
         return kafkaConfig;
     }
 
-
     public KafkaConfig getKafkaConfig(String name) {
         return kafkaMap.get(name);
     }
@@ -203,7 +188,7 @@ public class KafkaConfigManager {
             throw new IllegalArgumentException();
         }
 
-        String path = formatStreamingConfigPath(kafkaConfig.getName());
+        String path = KafkaConfig.concatResourcePath(kafkaConfig.getName());
         getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
@@ -214,7 +199,6 @@ public class KafkaConfigManager {
         kafkaMap.remove(kafkaConfig.getName());
     }
 
-
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
         logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
@@ -245,7 +229,4 @@ public class KafkaConfigManager {
         logger.debug("Loaded " + kafkaMap.size() + " KafkaConfig(s)");
     }
 
-    private final ObjectMapper mapper = new ObjectMapper();
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 100ca2d..1dce844 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -85,10 +85,10 @@ public class KafkaConfig extends RootPersistentEntity {
     private String parserProperties;
     
     public String getResourcePath() {
-        return getKafkaResourcePath(name);
+        return concatResourcePath(name);
     }
 
-    public static String getKafkaResourcePath(String streamingName) {
+    public static String concatResourcePath(String streamingName) {
         return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c08ded6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
deleted file mode 100644
index 680dff8..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMetaExtractor.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.OptionGroup;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceTool;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationRegistry;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * extract cube related info for debugging/distributing purpose
- * TODO: deal with II case, deal with Streaming case
- */
-public class CubeMetaExtractor extends AbstractApplication {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeMetaExtractor.class);
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("Specify which cube to extract").create("cube");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false).withDescription("Specify which hybrid to extract").create("hybrid");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false).withDescription("Specify realizations in which project to extract").create("project");
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_INCLUDE_SEGMENTS = OptionBuilder.withArgName("includeSegments").hasArg().isRequired(false).withDescription("set this to true if want extract the segments info, related dicts, etc.").create("includeSegments");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_INCLUDE_JOB = OptionBuilder.withArgName("includeJobs").hasArg().isRequired(false).withDescription("set this to true if want to extract job info/outputs too").create("includeJobs");
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_DEST = OptionBuilder.withArgName("destDir").hasArg().isRequired(false).withDescription("specify the dest dir to save the related metadata").create("destDir");
-
-    private Options options = null;
-    private KylinConfig kylinConfig;
-    private MetadataManager metadataManager;
-    private ProjectManager projectManager;
-    private HybridManager hybridManager;
-    private CubeManager cubeManager;
-    private CubeDescManager cubeDescManager;
-    private IIManager iiManager;
-    private IIDescManager iiDescManager;
-    private ExecutableDao executableDao;
-    RealizationRegistry realizationRegistry;
-
-    public CubeMetaExtractor() {
-        options = new Options();
-
-        OptionGroup realizationOrProject = new OptionGroup();
-        realizationOrProject.addOption(OPTION_CUBE);
-        realizationOrProject.addOption(OPTION_PROJECT);
-        realizationOrProject.addOption(OPTION_HYBRID);
-        realizationOrProject.setRequired(true);
-
-        options.addOptionGroup(realizationOrProject);
-        options.addOption(OPTION_INCLUDE_SEGMENTS);
-        options.addOption(OPTION_INCLUDE_JOB);
-        options.addOption(OPTION_DEST);
-
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        boolean includeSegments = optionsHelper.hasOption(OPTION_INCLUDE_SEGMENTS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_SEGMENTS)) : true;
-        boolean includeJobs = optionsHelper.hasOption(OPTION_INCLUDE_JOB) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_JOB)) : true;
-        String dest = null;
-        if (optionsHelper.hasOption(OPTION_DEST)) {
-            dest = optionsHelper.getOptionValue(OPTION_DEST);
-        }
-
-        if (!includeSegments) {
-            throw new RuntimeException("Does not support skip segments for now");
-        }
-
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        metadataManager = MetadataManager.getInstance(kylinConfig);
-        projectManager = ProjectManager.getInstance(kylinConfig);
-        hybridManager = HybridManager.getInstance(kylinConfig);
-        cubeManager = CubeManager.getInstance(kylinConfig);
-        cubeDescManager = CubeDescManager.getInstance(kylinConfig);
-        iiManager = IIManager.getInstance(kylinConfig);
-        iiDescManager = IIDescManager.getInstance(kylinConfig);
-        executableDao = ExecutableDao.getInstance(kylinConfig);
-        realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
-
-        List<String> requiredResources = Lists.newArrayList();
-        List<String> optionalResources = Lists.newArrayList();
-
-        if (optionsHelper.hasOption(OPTION_PROJECT)) {
-            ProjectInstance projectInstance = projectManager.getProject(optionsHelper.getOptionValue(OPTION_PROJECT));
-            if (projectInstance == null) {
-                throw new IllegalArgumentException("Project " + optionsHelper.getOptionValue(OPTION_PROJECT) + " does not exist");
-            }
-            addRequired(requiredResources, ProjectInstance.concatResourcePath(projectInstance.getName()));
-            List<RealizationEntry> realizationEntries = projectInstance.getRealizationEntries();
-            for (RealizationEntry realizationEntry : realizationEntries) {
-                retrieveResourcePath(getRealization(realizationEntry), includeSegments, includeJobs, requiredResources, optionalResources);
-            }
-        } else if (optionsHelper.hasOption(OPTION_CUBE)) {
-            String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
-            IRealization realization;
-
-            if ((realization = cubeManager.getRealization(cubeName)) != null) {
-                retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
-            } else {
-                throw new IllegalArgumentException("No cube found with name of " + cubeName);
-            }
-        } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
-            String hybridName = optionsHelper.getOptionValue(OPTION_HYBRID);
-            IRealization realization;
-
-            if ((realization = hybridManager.getRealization(hybridName)) != null) {
-                retrieveResourcePath(realization, includeSegments, includeJobs, requiredResources, optionalResources);
-            } else {
-                throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
-            }
-        }
-
-        executeExtraction(requiredResources, optionalResources, dest);
-    }
-
-    private void executeExtraction(List<String> requiredPaths, List<String> optionalPaths, String dest) {
-        logger.info("The resource paths going to be extracted:");
-        for (String s : requiredPaths) {
-            logger.info(s + "(required)");
-        }
-        for (String s : optionalPaths) {
-            logger.info(s + "(optional)");
-        }
-
-        if (dest == null) {
-            logger.info("Dest is not set, exit directly without extracting");
-        } else {
-            try {
-                ResourceTool.copy(KylinConfig.getInstanceFromEnv(), KylinConfig.createInstanceFromUri(dest));
-            } catch (IOException e) {
-                throw new RuntimeException("IOException", e);
-            }
-        }
-    }
-
-    private IRealization getRealization(RealizationEntry realizationEntry) {
-        return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
-    }
-
-    private void retrieveResourcePath(IRealization realization, boolean includeSegments, boolean includeJobs, List<String> requiredResources, List<String> optionalResources) {
-
-        logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
-
-        if (realization instanceof CubeInstance) {
-            CubeInstance cube = (CubeInstance) realization;
-            String descName = cube.getDescName();
-            CubeDesc cubeDesc = cubeDescManager.getCubeDesc(descName);
-            String modelName = cubeDesc.getModelName();
-            DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
-
-            for (String tableName : modelDesc.getAllTables()) {
-                addRequired(requiredResources, TableDesc.concatResourcePath(tableName));
-                addOptional(optionalResources, TableDesc.concatExdResourcePath(tableName));
-            }
-
-            addRequired(requiredResources, DataModelDesc.concatResourcePath(modelDesc.getName()));
-            addRequired(requiredResources, CubeDesc.concatResourcePath(cubeDesc.getName()));
-
-            if (includeSegments) {
-                addRequired(requiredResources, CubeInstance.concatResourcePath(cube.getName()));
-                for (CubeSegment segment : cube.getSegments()) {
-                    for (String dictPat : segment.getDictionaryPaths()) {
-                        addRequired(requiredResources, dictPat);
-                    }
-                    for (String snapshotPath : segment.getSnapshotPaths()) {
-                        addRequired(requiredResources, snapshotPath);
-                    }
-                    addRequired(requiredResources, segment.getStatisticsResourcePath());
-
-                    if (includeJobs) {
-                        String lastJobId = segment.getLastBuildJobID();
-                        if (!StringUtils.isEmpty(lastJobId)) {
-                            logger.warn("No job exist for segment {}", segment);
-                        } else {
-                            try {
-                                ExecutablePO executablePO = executableDao.getJob(lastJobId);
-                                addRequired(requiredResources, ExecutableDao.pathOfJob(lastJobId));
-                                addRequired(requiredResources, ExecutableDao.pathOfJobOutput(lastJobId));
-                                for (ExecutablePO task : executablePO.getTasks()) {
-                                    addRequired(requiredResources, ExecutableDao.pathOfJob(task.getUuid()));
-                                    addRequired(requiredResources, ExecutableDao.pathOfJobOutput(task.getUuid()));
-                                }
-                            } catch (PersistentException e) {
-                                throw new RuntimeException("PersistentException", e);
-                            }
-                        }
-                    } else {
-                        logger.info("Job info will not be extracted");
-                    }
-                }
-            } else {
-                if (includeJobs) {
-                    logger.warn("It's useless to set includeJobs to true when includeSegments is set to false");
-                }
-
-                throw new IllegalStateException("Does not support skip segments now");
-            }
-        } else if (realization instanceof HybridInstance) {
-            HybridInstance hybridInstance = (HybridInstance) realization;
-            addRequired(requiredResources, HybridInstance.concatResourcePath(hybridInstance.getName()));
-            for (IRealization iRealization : hybridInstance.getRealizations()) {
-                retrieveResourcePath(iRealization, includeSegments, includeJobs, requiredResources, optionalResources);
-            }
-        } else if (realization instanceof IIInstance) {
-            throw new IllegalStateException("Does not support extract II instance or hybrid that contains II");
-        } else {
-            throw new IllegalStateException("Unknown realization type: " + realization.getType());
-        }
-    }
-
-    private void addRequired(List<String> resourcePaths, String record) {
-        logger.info("adding required resource {}", record);
-        resourcePaths.add(record);
-    }
-
-    private void addOptional(List<String> optionalPaths, String record) {
-        logger.info("adding optional resource {}", record);
-        optionalPaths.add(record);
-    }
-
-    public static void main(String[] args) {
-        CubeMetaExtractor extractor = new CubeMetaExtractor();
-        extractor.execute(args);
-    }
-}