You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/08 02:18:42 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4818 Not store
HLL binary file into RDBMS directly
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 124dd05 KYLIN-4818 Not store HLL binary file into RDBMS directly
124dd05 is described below
commit 124dd054923b9e41f37c725494ff1cadac496eee
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Thu Jan 7 20:43:55 2021 +0800
KYLIN-4818 Not store HLL binary file into RDBMS directly
---
.../apache/kylin/common/persistence/ResourceStore.java | 17 ++++++++++++-----
.../main/java/org/apache/kylin/cube/CubeSegment.java | 6 +++---
.../kylin/engine/spark/utils/UpdateMetadataUtil.java | 9 +++------
.../org/apache/kylin/engine/spark/job/CubeBuildJob.java | 9 +++------
.../apache/kylin/storage/hbase/HBaseResourceStore.java | 3 ++-
5 files changed, 23 insertions(+), 21 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 80d66af..65d4f59 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.common.persistence;
@@ -138,7 +138,7 @@ abstract public class ResourceStore {
StringEntity.serializer);
}
StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.serializer);
- return entity == null ? "":entity.toString();
+ return entity == null ? "" : entity.toString();
}
/**
@@ -361,6 +361,13 @@ abstract public class ResourceStore {
return writer.bytesWritten();
}
+ final public void putBigResource(String resPath, InputStream content, long ts) throws IOException {
+ resPath = norm(resPath);
+ ContentWriter writer = ContentWriter.create(content);
+ writer.markBigContent();
+ putResourceCheckpoint(resPath, writer, ts);
+ }
+
/**
* Overwrite a resource without write conflict check
* @return bytes written
@@ -447,7 +454,7 @@ abstract public class ResourceStore {
throws IOException, WriteConflictException;
private long checkAndPutResourceWithRetry(final String resPath, final byte[] content, final long oldTS,
- final long newTS) throws IOException, WriteConflictException {
+ final long newTS) throws IOException, WriteConflictException {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
return retry.doWithRetry(() -> checkAndPutResourceImpl(resPath, content, oldTS, newTS));
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 706cd97..1cd3aa5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -541,7 +541,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
}
public String getPreciseStatisticsResourcePath() {
- return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), "json");
+ return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), "");
}
public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index 5560a1c..0987842 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -40,7 +40,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.job.NSparkExecutable;
@@ -50,8 +49,6 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS;
-
public class UpdateMetadataUtil {
protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class);
@@ -80,12 +77,12 @@ public class UpdateMetadataUtil {
currentInstanceCopy.toString(), toUpdateSeg.toString(), tobeSegments.toString()));
String resKey = toUpdateSeg.getStatisticsResourcePath();
- String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(currentInstanceCopy.getConfig().getHdfsWorkingDirectory(), nsparkExecutable.getParam(MetadataConstants.P_JOB_ID));
- Path statisticsFile = new Path(jobWorkingDirPath + "/" + segmentId + "/" + CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ String jobTmpDir = config.getJobTmpDir(currentInstanceCopy.getProject()) + "/" + nsparkExecutable.getParam(MetadataConstants.P_JOB_ID);
+ Path statisticsFile = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + segmentId + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
if (fs.exists(statisticsFile)) {
FSDataInputStream is = fs.open(statisticsFile);
- ResourceStore.getStore(config).putResource(resKey, is, System.currentTimeMillis());
+ ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis());
}
CubeUpdate update = new CubeUpdate(currentInstanceCopy);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 89ecad4..51f9f2c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -85,8 +84,6 @@ import org.apache.kylin.shaded.com.google.common.collect.Sets;
import scala.Tuple2;
import scala.collection.JavaConversions;
-import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS;
-
public class CubeBuildJob extends SparkApplication {
protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class);
protected static String TEMP_DIR_SUFFIX = "_temp";
@@ -119,7 +116,7 @@ public class CubeBuildJob extends SparkApplication {
cubeManager = CubeManager.getInstance(config);
cubeInstance = cubeManager.getCubeByUuid(cubeName);
CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId);
- SpanningTree spanningTree ;
+ SpanningTree spanningTree;
ParentSourceChooser sourceChooser;
// Cuboid Statistics is served for Cube Planner Phase One at the moment
@@ -140,8 +137,8 @@ public class CubeBuildJob extends SparkApplication {
logger.info("Cuboid statistics return {} records and cost {} ms.", hllMap.size(), (System.currentTimeMillis() - startMills));
// 1.2 Save cuboid statistics
- String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId);
- Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS);
+ String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId;
+ Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + "/");
Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate));
long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L);
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc);
diff --git a/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index e5a2595..ab077d8 100644
--- a/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -61,6 +61,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+@Deprecated
public class HBaseResourceStore extends PushdownResourceStore {
private static Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class);
@@ -147,7 +148,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
@Override
protected void visitFolderImpl(String folderPath, final boolean recursive, VisitFilter filter,
- final boolean loadContent, final Visitor visitor) throws IOException {
+ final boolean loadContent, final Visitor visitor) throws IOException {
visitFolder(folderPath, filter, loadContent, new FolderVisitor() {
@Override