You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/12/10 02:13:12 UTC

[carbondata] branch master updated: [CARBONDATA-3585] Handle Range Compaction failure in case of KryoSerializer

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ba073b3  [CARBONDATA-3585] Handle Range Compaction failure in case of KryoSerializer
ba073b3 is described below

commit ba073b38f5ecdc900e079c4abd1f53bf16f937b0
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Fri Nov 15 16:39:22 2019 +0530

    [CARBONDATA-3585] Handle Range Compaction failure in case of KryoSerializer
    
    Problem : Range Compaction fails in case of Kryo Serializer.
    Solution : Fixed it by converting splits into Byte-Array and then
    broadcasting them to all the executors in case of Range Column.
    
    This closes #3462
---
 .../carbondata/hadoop/CarbonInputSplitWrapper.java | 68 ++++++++++++++++++++++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |  8 +--
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplitWrapper.java b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplitWrapper.java
new file mode 100644
index 0000000..d9b8293
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplitWrapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public class CarbonInputSplitWrapper implements Serializable {
+  private byte[] data;
+  private int size;
+
+  public CarbonInputSplitWrapper(List<CarbonInputSplit> inputSplitList) {
+    ExtendedByteArrayOutputStream stream = new ExtendedByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(stream);
+    try {
+      for (CarbonInputSplit carbonInputSplit : inputSplitList) {
+        carbonInputSplit.write(dos);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      CarbonUtil.closeStreams(dos);
+    }
+    this.data = stream.getBuffer();
+    this.size = inputSplitList.size();
+  }
+
+  public List<CarbonInputSplit> getInputSplit() {
+    ByteArrayInputStream stream = new ByteArrayInputStream(data);
+    DataInputStream dis = new DataInputStream(stream);
+    List<CarbonInputSplit> splits = new ArrayList<>();
+    try {
+      for (int i = 0; i < size; i++) {
+        CarbonInputSplit split = new CarbonInputSplit();
+        split.readFields(dis);
+        splits.add(split);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      CarbonUtil.closeStreams(dis);
+    }
+    return splits;
+  }
+}
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 65fc9f3..0f31471 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -56,7 +56,7 @@ import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonInputSplitWrapper, CarbonMultiBlockSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.loading.TableProcessingOperations
@@ -89,10 +89,10 @@ class CarbonMergerRDD[K, V](
   var rangeColumn: CarbonColumn = null
   var singleRange = false
   var expressionMapForRangeCol: util.Map[Integer, Expression] = null
-  var broadCastSplits: Broadcast[util.List[CarbonInputSplit]] = null
+  var broadCastSplits: Broadcast[CarbonInputSplitWrapper] = null
 
   def makeBroadCast(splits: util.List[CarbonInputSplit]): Unit = {
-    broadCastSplits = sparkContext.broadcast(splits)
+    broadCastSplits = sparkContext.broadcast(new CarbonInputSplitWrapper(splits))
   }
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
@@ -121,7 +121,7 @@ class CarbonMergerRDD[K, V](
           // all the splits)
           carbonSparkPartition.split.value.getAllSplits
         } else {
-          broadCastSplits.value
+          broadCastSplits.value.getInputSplit
         }
         val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)