You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/08 02:18:42 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4818 Not store HLL binary file into RDBMS directly

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 124dd05  KYLIN-4818 Not store HLL binary file into RDBMS directly
124dd05 is described below

commit 124dd054923b9e41f37c725494ff1cadac496eee
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Thu Jan 7 20:43:55 2021 +0800

    KYLIN-4818 Not store HLL binary file into RDBMS directly
---
 .../apache/kylin/common/persistence/ResourceStore.java  | 17 ++++++++++++-----
 .../main/java/org/apache/kylin/cube/CubeSegment.java    |  6 +++---
 .../kylin/engine/spark/utils/UpdateMetadataUtil.java    |  9 +++------
 .../org/apache/kylin/engine/spark/job/CubeBuildJob.java |  9 +++------
 .../apache/kylin/storage/hbase/HBaseResourceStore.java  |  3 ++-
 5 files changed, 23 insertions(+), 21 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 80d66af..65d4f59 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.common.persistence;
 
@@ -138,7 +138,7 @@ abstract public class ResourceStore {
                     StringEntity.serializer);
         }
         StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.serializer);
-        return entity == null ? "":entity.toString();
+        return entity == null ? "" : entity.toString();
     }
 
     /**
@@ -361,6 +361,13 @@ abstract public class ResourceStore {
         return writer.bytesWritten();
     }
 
+    final public void putBigResource(String resPath, InputStream content, long ts) throws IOException {
+        resPath = norm(resPath);
+        ContentWriter writer = ContentWriter.create(content);
+        writer.markBigContent();
+        putResourceCheckpoint(resPath, writer, ts);
+    }
+
     /**
      * Overwrite a resource without write conflict check
      * @return bytes written
@@ -447,7 +454,7 @@ abstract public class ResourceStore {
             throws IOException, WriteConflictException;
 
     private long checkAndPutResourceWithRetry(final String resPath, final byte[] content, final long oldTS,
-            final long newTS) throws IOException, WriteConflictException {
+                                              final long newTS) throws IOException, WriteConflictException {
         ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this);
         return retry.doWithRetry(() -> checkAndPutResourceImpl(resPath, content, oldTS, newTS));
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 706cd97..1cd3aa5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -541,7 +541,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     }
 
     public String getPreciseStatisticsResourcePath() {
-        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), "json");
+        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), "");
     }
 
     public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index 5560a1c..0987842 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -40,7 +40,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.job.NSparkExecutable;
@@ -50,8 +49,6 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS;
-
 public class UpdateMetadataUtil {
 
     protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class);
@@ -80,12 +77,12 @@ public class UpdateMetadataUtil {
                             currentInstanceCopy.toString(), toUpdateSeg.toString(), tobeSegments.toString()));
 
         String resKey = toUpdateSeg.getStatisticsResourcePath();
-        String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(currentInstanceCopy.getConfig().getHdfsWorkingDirectory(), nsparkExecutable.getParam(MetadataConstants.P_JOB_ID));
-        Path statisticsFile = new Path(jobWorkingDirPath + "/" + segmentId + "/" + CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+        String jobTmpDir = config.getJobTmpDir(currentInstanceCopy.getProject()) + "/" + nsparkExecutable.getParam(MetadataConstants.P_JOB_ID);
+        Path statisticsFile = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + segmentId + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
         if (fs.exists(statisticsFile)) {
             FSDataInputStream is = fs.open(statisticsFile);
-            ResourceStore.getStore(config).putResource(resKey, is, System.currentTimeMillis());
+            ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis());
         }
 
         CubeUpdate update = new CubeUpdate(currentInstanceCopy);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 89ecad4..51f9f2c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -85,8 +84,6 @@ import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 
-import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS;
-
 public class CubeBuildJob extends SparkApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class);
     protected static String TEMP_DIR_SUFFIX = "_temp";
@@ -119,7 +116,7 @@ public class CubeBuildJob extends SparkApplication {
         cubeManager = CubeManager.getInstance(config);
         cubeInstance = cubeManager.getCubeByUuid(cubeName);
         CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId);
-        SpanningTree spanningTree ;
+        SpanningTree spanningTree;
         ParentSourceChooser sourceChooser;
 
         // Cuboid Statistics is served for Cube Planner Phase One at the moment
@@ -140,8 +137,8 @@ public class CubeBuildJob extends SparkApplication {
             logger.info("Cuboid statistics return {} records and cost {} ms.", hllMap.size(), (System.currentTimeMillis() - startMills));
 
             // 1.2 Save cuboid statistics
-            String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId);
-            Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS);
+            String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId;
+            Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + "/");
             Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate));
             long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L);
             CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc);
diff --git a/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index e5a2595..ab077d8 100644
--- a/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -61,6 +61,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+@Deprecated
 public class HBaseResourceStore extends PushdownResourceStore {
 
     private static Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class);
@@ -147,7 +148,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
 
     @Override
     protected void visitFolderImpl(String folderPath, final boolean recursive, VisitFilter filter,
-            final boolean loadContent, final Visitor visitor) throws IOException {
+                                   final boolean loadContent, final Visitor visitor) throws IOException {
 
         visitFolder(folderPath, filter, loadContent, new FolderVisitor() {
             @Override