You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/23 03:14:32 UTC

[kylin] 19/30: KYLIN-3963 Provide a custom serializer for PercentileCounter

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit a8667df713e2e943262b83f6c5025d0e49671d01
Author: yanghua <ya...@gmail.com>
AuthorDate: Sat Apr 20 10:46:20 2019 +0800

    KYLIN-3963 Provide a custom serializer for PercentileCounter
---
 .../kylin/engine/flink/FlinkCubingByLayer.java     |  4 ++
 .../kylin/engine/flink/FlinkCubingMerge.java       |  4 ++
 .../flink/util/PercentileCounterSerializer.java    | 57 ++++++++++++++++++++++
 3 files changed, 65 insertions(+)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index b8ddf95..1393cfc 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -49,6 +49,7 @@ import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.flink.util.PercentileCounterSerializer;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
@@ -60,6 +61,7 @@ import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.percentile.PercentileCounter;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,6 +155,8 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa
         boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().registerKryoType(PercentileCounter.class);
+        env.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
 
         DataSet<String[]> hiveDataSet = FlinkUtil.readHiveRecords(isSequenceFile, env, inputPath, hiveTable, job);
 
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index fcf8d6c..6a4ed66 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -46,6 +46,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.flink.util.PercentileCounterSerializer;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -55,6 +56,7 @@ import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.percentile.PercentileCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,6 +127,8 @@ public class FlinkCubingMerge extends AbstractApplication implements Serializabl
         FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
 
         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().registerKryoType(PercentileCounter.class);
+        env.getConfig().registerTypeWithKryoSerializer(PercentileCounter.class, PercentileCounterSerializer.class);
 
         final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
 
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
new file mode 100644
index 0000000..684f85e
--- /dev/null
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.kylin.engine.flink.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.kylin.measure.percentile.PercentileCounter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A customized kryo serializer for {@link PercentileCounter}
+ */
+public class PercentileCounterSerializer extends Serializer<PercentileCounter> {
+
+    @Override
+    public void write(Kryo kryo, Output output, PercentileCounter counter) {
+        int length = counter.getRegisters().byteSize();
+        ByteBuffer buffer = ByteBuffer.allocate(length);
+        counter.getRegisters().asSmallBytes(buffer);
+        output.writeDouble(counter.getCompression());
+        output.writeDouble(counter.getQuantileRatio());
+        output.writeInt(buffer.position());
+        output.write(buffer.array(), 0, buffer.position());
+    }
+
+    @Override
+    public PercentileCounter read(Kryo kryo, Input input, Class type) {
+        double compression = input.readDouble();
+        double quantileRatio = input.readDouble();
+        int length = input.readInt();
+        byte[] buffer = new byte[length];
+        input.read(buffer);
+        PercentileCounter counter = new PercentileCounter(compression, quantileRatio);
+        counter.readRegisters(ByteBuffer.wrap(buffer));
+        return counter;
+    }
+}