You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/01 10:50:04 UTC
incubator-kylin git commit: KYLIN-807 Avoid write conflict between
job engine and stream cube builder
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 7f5290b36 -> 485044760
KYLIN-807 Avoid write conflict between job engine and stream cube builder
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/48504476
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/48504476
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/48504476
Branch: refs/heads/0.8.0
Commit: 485044760511ebe4846b8f4140d677cd6837fd41
Parents: 7f5290b
Author: shaofengshi <sh...@apache.org>
Authored: Mon Jun 1 16:49:37 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jun 1 16:49:50 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 12 +--
.../kylin/job/hadoop/hbase/CreateHTableJob.java | 42 +-------
.../kylin/job/hadoop/hbase/CubeHTableUtil.java | 82 ++++++++++++++
.../kylin/job/streaming/CubeStreamBuilder.java | 22 ++--
.../kylin/rest/controller/CubeController.java | 49 +++++++++
.../kylin/rest/request/CubeSegmentRequest.java | 108 +++++++++++++++++++
6 files changed, 259 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 05ef16c..812457a 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -273,10 +273,10 @@ public class CubeManager implements IRealizationProvider {
public CubeSegment appendSegments(CubeInstance cube, long endDate) throws IOException {
- return appendSegments(cube, endDate, true);
+ return appendSegments(cube, endDate, true, true);
}
- public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding) throws IOException {
+ public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding, boolean saveChange) throws IOException {
if (checkNoBuilding)
checkNoBuildingSegment(cube);
@@ -289,7 +289,9 @@ public class CubeManager implements IRealizationProvider {
}
validateNewSegments(cube, newSegment);
- saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+
+ if (saveChange)
+ saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
return newSegment;
}
@@ -317,8 +319,6 @@ public class CubeManager implements IRealizationProvider {
}
protected void saveCubeSegmentChange(CubeInstance cube, List<CubeSegment> toAdd, List<CubeSegment> toRemove) throws IOException {
-// cube = this.reloadCubeLocal(cube.getName());
-
if (toAdd != null && toAdd.size() > 0)
cube.getSegments().addAll(toAdd);
@@ -543,7 +543,7 @@ public class CubeManager implements IRealizationProvider {
updateCube(cube, true);
}
- private void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+ public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
if (tobe.containsAll(newList) == false) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index e2d97e8..63e66c0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -105,40 +105,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
- tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
- tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-
Configuration conf = HBaseConfiguration.create(getConf());
- HBaseAdmin admin = new HBaseAdmin(conf);
try {
- if (User.isHBaseSecurityEnabled(conf)) {
- // add coprocessor for bulk load
- tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
- }
-
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
- cf.setMaxVersions(1);
-
- if (LZOSupportnessChecker.getSupportness()) {
- logger.info("hbase will use lzo to compress cube data");
- cf.setCompressionType(Algorithm.LZO);
- } else {
- logger.info("hbase will not use lzo to compress cube data");
- }
-
- cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
- cf.setInMemory(false);
- cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
- tableDesc.addFamily(cf);
- }
byte[][] splitKeys;
if (statistics_enabled) {
-
List<Integer> rowkeyColumnSize = Lists.newArrayList();
long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
@@ -154,25 +126,13 @@ public class CreateHTableJob extends AbstractHadoopJob {
splitKeys = getSplits(conf, partitionFilePath);
}
- if (admin.tableExists(tableName)) {
- // admin.disableTable(tableName);
- // admin.deleteTable(tableName);
- throw new RuntimeException("HBase table " + tableName + " exists!");
- }
-
- DeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
- admin.createTable(tableDesc, splitKeys);
- logger.info("create hbase table " + tableName + " done.");
-
+ CubeHTableUtil.createHTable(cubeDesc, tableName, splitKeys);
return 0;
} catch (Exception e) {
printUsage(options);
e.printStackTrace(System.err);
logger.error(e.getLocalizedMessage(), e);
return 2;
- } finally {
- admin.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
new file mode 100644
index 0000000..d281d8d
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
@@ -0,0 +1,82 @@
+package org.apache.kylin.job.hadoop.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.job.tools.DeployCoprocessorCLI;
+import org.apache.kylin.job.tools.LZOSupportnessChecker;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class CubeHTableUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
+
+ public static void createHTable(CubeDesc cubeDesc, String tableName, byte[][] splitKeys) throws IOException {
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+ HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+ // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
+ tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+ tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ try {
+ if (User.isHBaseSecurityEnabled(conf)) {
+ // add coprocessor for bulk load
+ tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+ HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
+ cf.setMaxVersions(1);
+
+ if (LZOSupportnessChecker.getSupportness()) {
+ logger.info("hbase will use lzo to compress cube data");
+ cf.setCompressionType(Compression.Algorithm.LZO);
+ } else {
+ logger.info("hbase will not use lzo to compress cube data");
+ }
+
+ cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+ cf.setInMemory(false);
+ cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
+ tableDesc.addFamily(cf);
+ }
+
+ if (admin.tableExists(tableName)) {
+ // admin.disableTable(tableName);
+ // admin.deleteTable(tableName);
+ throw new RuntimeException("HBase table " + tableName + " exists!");
+ }
+
+ DeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+ admin.createTable(tableDesc, splitKeys);
+ logger.info("create hbase table " + tableName + " done.");
+ } catch (Exception e) {
+ logger.error("Failed to create HTable", e);
+ throw e;
+ } finally {
+ admin.close();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index a164396..baa353a 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -20,7 +20,11 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.util.ToolRunner;
@@ -51,6 +55,7 @@ import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
+import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
import org.apache.kylin.job.inmemcubing.ICuboidWriter;
import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -104,9 +109,9 @@ public class CubeStreamBuilder extends StreamBuilder {
LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
blockingQueue.put(Collections.<String>emptyList());
- final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
- final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false);
+ final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName), System.currentTimeMillis(), false, false);
final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(), parsedStreamMessages);
final Configuration conf = HadoopUtil.getCurrentConfiguration();
@@ -346,7 +351,11 @@ public class CubeStreamBuilder extends StreamBuilder {
private void commitSegment(CubeSegment cubeSegment) throws IOException {
cubeSegment.setStatus(SegmentStatusEnum.READY);
- CubeManager.getInstance(kylinConfig).updateCube(cubeSegment.getCubeInstance(), true);
+
+ CubeInstance cube = CubeManager.getInstance(kylinConfig).reloadCubeLocal(cubeSegment.getCubeInstance().getName());
+ cube.getSegments().add(cubeSegment);
+ Collections.sort(cube.getSegments());
+ CubeManager.getInstance(kylinConfig).updateCube(cube, true);
}
private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
@@ -377,12 +386,7 @@ public class CubeStreamBuilder extends StreamBuilder {
private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception {
final String hTableName = cubeSegment.getStorageLocationIdentifier();
- String[] args = new String[]{"-cubename", cubeName,
- "-segmentname", cubeSegment.getName(),
- "-input", "/empty",
- "-htablename", hTableName,
- "-statisticsenabled", "true"};
- ToolRunner.run(new CreateHTableJob(), args);
+ CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);
final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName);
logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!");
return hTable;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 466a007..a7df432 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -39,6 +39,7 @@ import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.CubeRequest;
+import org.apache.kylin.rest.request.CubeSegmentRequest;
import org.apache.kylin.rest.request.JobBuildRequest;
import org.apache.kylin.rest.response.GeneralResponse;
import org.apache.kylin.rest.response.HBaseResponse;
@@ -405,6 +406,54 @@ public class CubeController extends BasicController {
return hbase;
}
+
+ @RequestMapping(value = "/{cubeName}/segments", method = {RequestMethod.POST})
+ @ResponseBody
+ @Metered(name = "appendSegment")
+ public CubeSegmentRequest appendSegment(@PathVariable String cubeName, @RequestBody CubeSegmentRequest cubeSegmentRequest) {
+ CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+ if (null == cube) {
+ throw new InternalErrorException("Cannot find cube " + cubeName);
+ }
+
+ CubeSegment segment = deserializeCubeSegment(cubeSegmentRequest);
+
+ cubeService.getCubeManager().validateNewSegments(cube, segment);
+ cube.getSegments().add(segment);
+ Collections.sort(cube.getSegments());
+ try {
+ cubeService.getCubeManager().updateCube(cube, true);
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
+ throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
+ }
+
+ cubeSegmentRequest.setSuccessful(true);
+ cubeSegmentRequest.setMessage("Successfully append cube segment " + segment.getName());
+ return cubeSegmentRequest;
+ }
+
+ private CubeSegment deserializeCubeSegment(CubeSegmentRequest cubeSegmentRequest) {
+ CubeSegment segment = null;
+ try {
+ logger.debug("Saving cube segment " + cubeSegmentRequest.getCubeSegmentData());
+ segment = JsonUtil.readValue(cubeSegmentRequest.getCubeSegmentData(), CubeSegment.class);
+ } catch (JsonParseException e) {
+ logger.error("The cube definition is not valid.", e);
+ cubeSegmentRequest.setSuccessful(false);
+ cubeSegmentRequest.setMessage(e.getMessage());
+ } catch (JsonMappingException e) {
+ logger.error("The cube definition is not valid.", e);
+ cubeSegmentRequest.setSuccessful(false);
+ cubeSegmentRequest.setMessage(e.getMessage());
+ } catch (IOException e) {
+ logger.error("Failed to deal with the request.", e);
+ throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
+ }
+ return segment;
+ }
+
+
private CubeDesc deserializeCubeDesc(CubeRequest cubeRequest) {
CubeDesc desc = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/48504476/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java b/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java
new file mode 100644
index 0000000..b3d57d0
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/request/CubeSegmentRequest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+public class CubeSegmentRequest {
+
+ private String uuid;
+ private String cubeName;
+ private String cubeSegmentData;
+ private boolean successful;
+ private String message;
+ private String project;
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ /**
+ * @return the message
+ */
+ public String getMessage() {
+ return message;
+ }
+
+ /**
+ * @param message
+ * the message to set
+ */
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ /**
+ * @return the status
+ */
+ public boolean getSuccessful() {
+ return successful;
+ }
+
+ /**
+ * @param status
+ * the status to set
+ */
+ public void setSuccessful(boolean status) {
+ this.successful = status;
+ }
+
+ public CubeSegmentRequest() {
+ }
+
+ public CubeSegmentRequest(String cubeName, String cubeDescData) {
+ this.cubeName = cubeName;
+ this.cubeSegmentData = cubeDescData;
+ }
+
+ public String getCubeSegmentData() {
+ return cubeSegmentData;
+ }
+
+ public void setCubeSegmentData(String cubeSegmentData) {
+ this.cubeSegmentData = cubeSegmentData;
+ }
+
+
+ /**
+ * @return the cubeDescName
+ */
+ public String getCubeName() {
+ return cubeName;
+ }
+
+ /**
+ * @param cubeName
+ * the cubeDescName to set
+ */
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public void setProject(String project) {
+ this.project = project;
+ }
+
+}