You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2019/03/20 14:44:31 UTC

[kylin] branch engine-flink updated: KYLIN-3896 Implement IFlinkOutput based on HBase

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new ea34ffc  KYLIN-3896 Implement IFlinkOutput based on HBase
ea34ffc is described below

commit ea34ffceaef91beba81a95087ab998e6a2f736c4
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Mar 20 13:47:52 2019 +0800

    KYLIN-3896 Implement IFlinkOutput based on HBase
---
 storage-hbase/pom.xml                              |  5 ++
 .../apache/kylin/storage/hbase/HBaseStorage.java   |  4 +
 .../hbase/steps/HBaseFlinkOutputTransition.java    | 97 ++++++++++++++++++++++
 3 files changed, 106 insertions(+)

diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 0403700..6f38ed6 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -52,6 +52,11 @@
             <artifactId>kylin-engine-spark</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-flink</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index ded6598..2a30444 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.flink.IFlinkOutput;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.spark.ISparkOutput;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -31,6 +32,7 @@ import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.storage.IStorage;
 import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.hbase.steps.HBaseFlinkOutputTransition;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
 import org.apache.kylin.storage.hbase.steps.HBaseSparkOutputTransition;
 
@@ -89,6 +91,8 @@ public class HBaseStorage implements IStorage {
             return (I) new HBaseMROutput2Transition();
         } else if (engineInterface == ISparkOutput.class) {
             return (I) new HBaseSparkOutputTransition();
+        } else if (engineInterface == IFlinkOutput.class) {
+            return (I) new HBaseFlinkOutputTransition();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java
new file mode 100644
index 0000000..a2a05c5
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseFlinkOutputTransition.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.flink.IFlinkOutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * This "Transition" impl generates cuboid files and then convert to HFile.
+ * The additional step slows down build process, but the gains is merge
+ * can read from HDFS instead of over HBase region server. See KYLIN-1007.
+ * 
+ * This is transitional because finally we want to merge from HTable snapshot.
+ * However multiple snapshots as MR input is only supported by HBase 1.x.
+ * Before most users upgrade to latest HBase, they can only use this transitional
+ * cuboid file solution.
+ */
+public class HBaseFlinkOutputTransition implements IFlinkOutput {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(HBaseFlinkOutputTransition.class);
+
+    @Override
+    public IFlinkBatchCubingOutputSide getBatchCubingOutputSide(final CubeSegment seg) {
+        final HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+        return new IFlinkBatchCubingOutputSide() {
+
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                // nothing to do
+            }
+
+        };
+    }
+
+    @Override
+    public IFlinkBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) {
+        return new IFlinkBatchMergeOutputSide() {
+            final HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
+                    DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addMergingGarbageCollectionSteps(jobFlow);
+            }
+
+        };
+    }
+
+    public IFlinkBatchOptimizeOutputSide getBatchOptimizeOutputSide(final CubeSegment seg) {
+        return null;
+    }
+}
\ No newline at end of file