You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/01 10:50:04 UTC

incubator-kylin git commit: KYLIN-807 Avoid write conflict between job engine and stream cube builder

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 7f5290b36 -> 485044760


KYLIN-807 Avoid write conflict between job engine and stream cube builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/48504476
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/48504476
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/48504476

Branch: refs/heads/0.8.0
Commit: 485044760511ebe4846b8f4140d677cd6837fd41
Parents: 7f5290b
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jun 1 16:49:37 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jun 1 16:49:50 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |  12 +--
 .../kylin/job/hadoop/hbase/CreateHTableJob.java |  42 +-------
 .../kylin/job/hadoop/hbase/CubeHTableUtil.java  |  82 ++++++++++++++
 .../kylin/job/streaming/CubeStreamBuilder.java  |  22 ++--
 .../kylin/rest/controller/CubeController.java   |  49 +++++++++
 .../kylin/rest/request/CubeSegmentRequest.java  | 108 +++++++++++++++++++
 6 files changed, 259 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 05ef16c..812457a 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -273,10 +273,10 @@ public class CubeManager implements IRealizationProvider {
 
 
     public CubeSegment appendSegments(CubeInstance cube, long endDate) throws IOException {
-        return appendSegments(cube, endDate, true);
+        return appendSegments(cube, endDate, true, true);
     }
 
-    public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding) throws IOException {
+    public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding, boolean saveChange) throws IOException {
         if (checkNoBuilding)
             checkNoBuildingSegment(cube);
 
@@ -289,7 +289,9 @@ public class CubeManager implements IRealizationProvider {
         }
 
         validateNewSegments(cube, newSegment);
-        saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+
+        if (saveChange)
+            saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
 
         return newSegment;
     }
@@ -317,8 +319,6 @@ public class CubeManager implements IRealizationProvider {
     }
 
     protected void saveCubeSegmentChange(CubeInstance cube, List<CubeSegment> toAdd, List<CubeSegment> toRemove) throws IOException {
-//        cube = this.reloadCubeLocal(cube.getName());
-
         if (toAdd != null && toAdd.size() > 0)
             cube.getSegments().addAll(toAdd);
 
@@ -543,7 +543,7 @@ public class CubeManager implements IRealizationProvider {
         updateCube(cube, true);
     }
 
-    private void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+    public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
         List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
         List<CubeSegment> newList = Arrays.asList(newSegments);
         if (tobe.containsAll(newList) == false) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index e2d97e8..63e66c0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -105,40 +105,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 
         String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
-        // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
-        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
-        tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-
         Configuration conf = HBaseConfiguration.create(getConf());
-        HBaseAdmin admin = new HBaseAdmin(conf);
 
         try {
-            if (User.isHBaseSecurityEnabled(conf)) {
-                // add coprocessor for bulk load
-                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
-            }
-
-            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-                HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
-                cf.setMaxVersions(1);
-
-                if (LZOSupportnessChecker.getSupportness()) {
-                    logger.info("hbase will use lzo to compress cube data");
-                    cf.setCompressionType(Algorithm.LZO);
-                } else {
-                    logger.info("hbase will not use lzo to compress cube data");
-                }
-
-                cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
-                cf.setInMemory(false);
-                cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
-                tableDesc.addFamily(cf);
-            }
 
             byte[][] splitKeys;
             if (statistics_enabled) {
-
                 List<Integer> rowkeyColumnSize = Lists.newArrayList();
                 long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
                 Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
@@ -154,25 +126,13 @@ public class CreateHTableJob extends AbstractHadoopJob {
                 splitKeys = getSplits(conf, partitionFilePath);
             }
 
-            if (admin.tableExists(tableName)) {
-                // admin.disableTable(tableName);
-                // admin.deleteTable(tableName);
-                throw new RuntimeException("HBase table " + tableName + " exists!");
-            }
-
-            DeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
-            admin.createTable(tableDesc, splitKeys);
-            logger.info("create hbase table " + tableName + " done.");
-
+            CubeHTableUtil.createHTable(cubeDesc, tableName, splitKeys);
             return 0;
         } catch (Exception e) {
             printUsage(options);
             e.printStackTrace(System.err);
             logger.error(e.getLocalizedMessage(), e);
             return 2;
-        } finally {
-            admin.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
new file mode 100644
index 0000000..d281d8d
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
@@ -0,0 +1,82 @@
+package org.apache.kylin.job.hadoop.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.job.tools.DeployCoprocessorCLI;
+import org.apache.kylin.job.tools.LZOSupportnessChecker;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class CubeHTableUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
+
+    public static void createHTable(CubeDesc cubeDesc, String tableName, byte[][] splitKeys) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+        // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
+        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+        tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+
+        Configuration conf = HBaseConfiguration.create();
+        HBaseAdmin admin = new HBaseAdmin(conf);
+
+        try {
+            if (User.isHBaseSecurityEnabled(conf)) {
+                // add coprocessor for bulk load
+                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+            }
+
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+                HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
+                cf.setMaxVersions(1);
+
+                if (LZOSupportnessChecker.getSupportness()) {
+                    logger.info("hbase will use lzo to compress cube data");
+                    cf.setCompressionType(Compression.Algorithm.LZO);
+                } else {
+                    logger.info("hbase will not use lzo to compress cube data");
+                }
+
+                cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+                cf.setInMemory(false);
+                cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
+                tableDesc.addFamily(cf);
+            }
+
+            if (admin.tableExists(tableName)) {
+                // admin.disableTable(tableName);
+                // admin.deleteTable(tableName);
+                throw new RuntimeException("HBase table " + tableName + " exists!");
+            }
+
+            DeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+            admin.createTable(tableDesc, splitKeys);
+            logger.info("create hbase table " + tableName + " done.");
+        } catch (Exception e) {
+            logger.error("Failed to create HTable", e);
+            throw e;
+        } finally {
+            admin.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index a164396..baa353a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -20,7 +20,11 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.util.ToolRunner;
@@ -51,6 +55,7 @@ import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
 import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
 import org.apache.kylin.job.inmemcubing.ICuboidWriter;
 import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -104,9 +109,9 @@ public class CubeStreamBuilder extends StreamBuilder {
         LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
         blockingQueue.put(Collections.<String>emptyList());
 
-        final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false);
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false, false);
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();
@@ -346,7 +351,11 @@ public class CubeStreamBuilder extends StreamBuilder {
 
     private void commitSegment(CubeSegment cubeSegment) throws IOException {
         cubeSegment.setStatus(SegmentStatusEnum.READY);
-        CubeManager.getInstance(kylinConfig).updateCube(cubeSegment.getCubeInstance(), true);
+
+        CubeInstance cube = CubeManager.getInstance(kylinConfig).reloadCubeLocal(cubeSegment.getCubeInstance().getName());
+        cube.getSegments().add(cubeSegment);
+        Collections.sort(cube.getSegments());
+        CubeManager.getInstance(kylinConfig).updateCube(cube, true);
     }
 
     private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
@@ -377,12 +386,7 @@ public class CubeStreamBuilder extends StreamBuilder {
 
     private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
         final String hTableName = cubeSegment.getStorageLocationIdentifier();
-        String[] args = new String[]{"-cubename", cubeName,
-                "-segmentname", cubeSegment.getName(),
-                "-input", "/empty",
-                "-htablename", hTableName,
-                "-statisticsenabled", "true"};
-        ToolRunner.run(new CreateHTableJob(), args);
+        CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
         final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
         logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
         return hTable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 466a007..a7df432 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -39,6 +39,7 @@ import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CubeRequest;
+import org.apache.kylin.rest.request.CubeSegmentRequest;
 import org.apache.kylin.rest.request.JobBuildRequest;
 import org.apache.kylin.rest.response.GeneralResponse;
 import org.apache.kylin.rest.response.HBaseResponse;
@@ -405,6 +406,54 @@ public class CubeController extends BasicController {
         return hbase;
     }
 
+
+    @RequestMapping(value = "/{cubeName}/segments", method = {RequestMethod.POST})
+    @ResponseBody
+    @Metered(name = "appendSegment")
+    public CubeSegmentRequest appendSegment(@PathVariable String cubeName, @RequestBody CubeSegmentRequest cubeSegmentRequest) {
+        CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+        if (null == cube) {
+            throw new InternalErrorException("Cannot find cube " + cubeName);
+        }
+
+        CubeSegment segment = deserializeCubeSegment(cubeSegmentRequest);
+
+        cubeService.getCubeManager().validateNewSegments(cube, segment);
+        cube.getSegments().add(segment);
+        Collections.sort(cube.getSegments());
+        try {
+            cubeService.getCubeManager().updateCube(cube, true);
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+        }
+
+        cubeSegmentRequest.setSuccessful(true);
+        cubeSegmentRequest.setMessage("Successfully append cube segment " + segment.getName());
+        return cubeSegmentRequest;
+    }
+
+    private CubeSegment deserializeCubeSegment(CubeSegmentRequest cubeSegmentRequest) {
+        CubeSegment segment = null;
+        try {
+            logger.debug("Saving cube segment " + cubeSegmentRequest.getCubeSegmentData());
+            segment = JsonUtil.readValue(cubeSegmentRequest.getCubeSegmentData(), CubeSegment.class);
+        } catch (JsonParseException e) {
+            logger.error("The cube definition is not valid.", e);
+            cubeSegmentRequest.setSuccessful(false);
+            cubeSegmentRequest.setMessage(e.getMessage());
+        } catch (JsonMappingException e) {
+            logger.error("The cube definition is not valid.", e);
+            cubeSegmentRequest.setSuccessful(false);
+            cubeSegmentRequest.setMessage(e.getMessage());
+        } catch (IOException e) {
+            logger.error("Failed to deal with the request.", e);
+            throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+        }
+        return segment;
+    }
+
+
     private CubeDesc deserializeCubeDesc(CubeRequest cubeRequest) {
         CubeDesc desc = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java b/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java
new file mode 100644
index 0000000..b3d57d0
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+public class CubeSegmentRequest {
+
+    private String uuid;
+    private String cubeName;
+    private String cubeSegmentData;
+    private boolean successful;
+    private String message;
+    private String project;
+
+    public String getUuid() {
+        return uuid;
+    }
+
+    public void setUuid(String uuid) {
+        this.uuid = uuid;
+    }
+
+    /**
+     * @return the message
+     */
+    public String getMessage() {
+        return message;
+    }
+
+    /**
+     * @param message
+     *            the message to set
+     */
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    /**
+     * @return the status
+     */
+    public boolean getSuccessful() {
+        return successful;
+    }
+
+    /**
+     * @param status
+     *            the status to set
+     */
+    public void setSuccessful(boolean status) {
+        this.successful = status;
+    }
+
+    public CubeSegmentRequest() {
+    }
+
+    public CubeSegmentRequest(String cubeName, String cubeDescData) {
+        this.cubeName = cubeName;
+        this.cubeSegmentData = cubeDescData;
+    }
+
+    public String getCubeSegmentData() {
+        return cubeSegmentData;
+    }
+
+    public void setCubeSegmentData(String cubeSegmentData) {
+        this.cubeSegmentData = cubeSegmentData;
+    }
+
+
+    /**
+     * @return the cubeDescName
+     */
+    public String getCubeName() {
+        return cubeName;
+    }
+
+    /**
+     * @param cubeName
+     *            the cubeDescName to set
+     */
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
+    }
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+}