You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:29 UTC
[04/50] [abbrv] carbondata git commit: [CARBONDATA-2489] Coverity
scan fixes
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 7ddd181..ffe1aef 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -49,7 +49,7 @@ public class IntegerStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- if(isDictionary) {
+ if (isDictionary) {
populateDictionaryVector(type, numberOfRows, builder);
} else {
if (columnVector.anyNullsSet()) {
@@ -62,10 +62,8 @@ public class IntegerStreamReader extends AbstractStreamReader {
} else {
numberOfRows = streamData.length;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
- if (streamData != null) {
- for (int i = 0; i < numberOfRows; i++) {
- type.writeLong(builder, ((Integer) streamData[i]).longValue());
- }
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, ((Integer) streamData[i]).longValue());
}
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 015ac80..e1000c5 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -49,7 +49,7 @@ public class LongStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- if(isDictionary) {
+ if (isDictionary) {
populateDictionaryVector(type, numberOfRows, builder);
}
if (columnVector.anyNullsSet()) {
@@ -61,10 +61,8 @@ public class LongStreamReader extends AbstractStreamReader {
} else {
numberOfRows = streamData.length;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
- if (streamData != null) {
- for (int i = 0; i < numberOfRows; i++) {
- type.writeLong(builder, (Long) streamData[i]);
- }
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, (Long) streamData[i]);
}
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
index 82d62ad..8952712 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -41,32 +41,26 @@ public class ObjectStreamReader extends AbstractStreamReader {
* @return
* @throws IOException
*/
- public Block readBlock(Type type)
- throws IOException
- {
+ public Block readBlock(Type type) throws IOException {
int numberOfRows = 0;
BlockBuilder builder = null;
- if(isVectorReader) {
+ if (isVectorReader) {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- for(int i = 0; i < numberOfRows ; i++ ){
+ for (int i = 0; i < numberOfRows; i++) {
type.writeObject(builder, columnVector.getData(i));
}
}
-
} else {
numberOfRows = streamData.length;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
- if (streamData != null) {
- for(int i = 0; i < numberOfRows ; i++ ){
- type.writeObject(builder, streamData[i]);
- }
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeObject(builder, streamData[i]);
}
}
return builder.build();
-
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 00e5485..51f1cd5 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -49,7 +49,7 @@ public class ShortStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- if(isDictionary) {
+ if (isDictionary) {
populateDictionaryVector(type, numberOfRows, builder);
} else {
if (columnVector.anyNullsSet()) {
@@ -59,13 +59,11 @@ public class ShortStreamReader extends AbstractStreamReader {
}
}
}
- } else {
+ } else {
numberOfRows = streamData.length;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
- if (streamData != null) {
- for (int i = 0; i < numberOfRows; i++) {
- type.writeLong(builder, (Short) streamData[i]);
- }
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, (Short) streamData[i]);
}
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index d98afa3..cce35e0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -72,7 +72,7 @@ public class SliceStreamReader extends AbstractStreamReader {
}
return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
} else {
- if(columnVector.anyNullsSet()) {
+ if (columnVector.anyNullsSet()) {
handleNullInVector(type, numberOfRows, builder);
} else {
populateVector(type, numberOfRows, builder);
@@ -82,10 +82,8 @@ public class SliceStreamReader extends AbstractStreamReader {
} else {
numberOfRows = streamData.length;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
- if (streamData != null) {
- for (int i = 0; i < numberOfRows; i++) {
- type.writeSlice(builder, utf8Slice(streamData[i].toString()));
- }
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeSlice(builder, utf8Slice(streamData[i].toString()));
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 01b7939..a22ef29 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -39,10 +39,9 @@ public class TimestampStreamReader extends AbstractStreamReader {
numberOfRows = batchSize;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
if (columnVector != null) {
- if(columnVector.anyNullsSet()) {
+ if (columnVector.anyNullsSet()) {
handleNullInVector(type, numberOfRows, builder);
- }
- else {
+ } else {
populateVector(type, numberOfRows, builder);
}
}
@@ -50,10 +49,8 @@ public class TimestampStreamReader extends AbstractStreamReader {
} else {
numberOfRows = streamData.length;
builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
- if (streamData != null) {
- for (int i = 0; i < numberOfRows; i++) {
- type.writeLong(builder, (Long) streamData[i]);
- }
+ for (int i = 0; i < numberOfRows; i++) {
+ type.writeLong(builder, (Long) streamData[i]);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index d45e759..49aa7ff 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -17,9 +17,7 @@
package org.apache.spark.sql.common.util
-import java.io.{FileInputStream, ObjectInputStream, ObjectOutputStream}
-import java.math
-import java.math.RoundingMode
+import java.io.{ObjectInputStream, ObjectOutputStream}
import java.util.{Locale, TimeZone}
import org.apache.carbondata.common.logging.LogServiceFactory
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
index 1e98ec2..f4948c4 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/dictionary/server/SecureDictionaryServer.java
@@ -38,7 +38,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.netty.SparkTransportConf;
import org.apache.spark.network.sasl.SaslServerBootstrap;
-import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.util.TransportConf;
import scala.Some;
@@ -144,8 +143,7 @@ public class SecureDictionaryServer extends AbstractDictionaryServer implements
TransportServerBootstrap bootstrap =
new SaslServerBootstrap(transportConf, securityManager);
String host = findLocalIpAddress(LOGGER);
- TransportServer transportServer = context
- .createServer(host, port, Lists.<TransportServerBootstrap>newArrayList(bootstrap));
+ context.createServer(host, port, Lists.<TransportServerBootstrap>newArrayList(bootstrap));
LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
+ " Listening on port " + newPort);
this.port = newPort;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
deleted file mode 100644
index 9936a2a..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.Value
-
-class CarbonCleanFilesRDD[V: ClassTag](
- sc: SparkContext,
- valueClass: Value[V],
- databaseName: String,
- tableName: String,
- partitioner: Partitioner)
- extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
- }
-
- override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
- val iter = new Iterator[(V)] {
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- // TODO call CARBON delete API
-
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- valueClass.getValue(null)
- }
-
- }
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonLoadPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name: " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
deleted file mode 100644
index b11dfad..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.DeletedLoadResult
-
-class CarbonDeleteLoadByDateRDD[K, V](
- sc: SparkContext,
- result: DeletedLoadResult[K, V],
- databaseName: String,
- tableName: String,
- dateField: String,
- dateFieldActualName: String,
- dateValue: String,
- factTableName: String,
- dimTableName: String,
- storePath: String,
- loadMetadataDetails: List[LoadMetadataDetails])
- extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map {s =>
- new CarbonLoadPartition(id, s._2, s._1)
- }
- }
-
- override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- new Iterator[(K, V)] {
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
-
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
- // TODO call CARBON delete API
- logInfo("Applying data retention as per date value " + dateValue)
- var dateFormat = ""
- try {
- dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
- } catch {
- case e: Exception => logInfo("Unable to parse with default time format " + dateValue)
- }
- // TODO: Implement it
- val finished = false
-
- override def hasNext: Boolean = {
- finished
- }
-
- override def next(): (K, V) = {
- result.getKey(null, null)
- }
- }
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonLoadPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name: " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
deleted file mode 100644
index 759ed42..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.Value
-
-class CarbonDeleteLoadRDD[V: ClassTag](
- sc: SparkContext,
- valueClass: Value[V],
- loadId: Int,
- databaseName: String,
- tableName: String,
- partitioner: Partitioner)
- extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map {f =>
- new CarbonLoadPartition(id, f._2, f._1)
- }
- }
-
- override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
- val iter = new Iterator[V] {
- val split = theSplit.asInstanceOf[CarbonLoadPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- // TODO call CARBON delete API
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- valueClass.getValue(null)
- }
-
- }
- logInfo("********Deleting***************")
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonLoadPartition]
- val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
- logInfo("Host Name: " + s.head + s.length)
- s
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
deleted file mode 100644
index f327d88..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-
-import org.apache.carbondata.processing.util.CarbonQueryUtil
-import org.apache.carbondata.spark.Value
-
-class CarbonDropTableRDD[V: ClassTag](
- sc: SparkContext,
- valueClass: Value[V],
- databaseName: String,
- tableName: String)
- extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
- override def getPartitions: Array[Partition] = {
- val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
- splits.zipWithIndex.map { s =>
- new CarbonLoadPartition(id, s._2, s._1)
- }
- }
-
- override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
- val iter = new Iterator[V] {
- // TODO: Clear Btree from memory
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- valueClass.getValue(null)
- }
- }
- iter
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index c2c4ab3..5c3ace3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -153,6 +153,15 @@ object QueryTest {
Row.fromSeq(s.toSeq.map {
case d: java.math.BigDecimal => BigDecimal(d)
case b: Array[Byte] => b.toSeq
+ case d : Double =>
+ if (!d.isInfinite && !d.isNaN) {
+ var bd = BigDecimal(d)
+ bd = bd.setScale(5, BigDecimal.RoundingMode.UP)
+ bd.doubleValue()
+ }
+ else {
+ d
+ }
case o => o
})
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 903bf44..082ef8b 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.InputMetricsStats;
import org.apache.carbondata.spark.util.CarbonScalaUtil;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.spark.memory.MemoryMode;
@@ -134,24 +135,17 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
} catch (QueryExecutionException e) {
- Throwable ext = e;
- while (ext != null) {
- if (ext instanceof FileNotFoundException) {
- throw new InterruptedException(
- "Insert overwrite may be in progress.Please check " + e.getMessage());
- }
- ext = ext.getCause();
+ if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+ LOGGER.error(e);
+ throw new InterruptedException(
+ "Insert overwrite may be in progress.Please check " + e.getMessage());
}
throw new InterruptedException(e.getMessage());
} catch (Exception e) {
- Throwable ext = e;
- while (ext != null) {
- if (ext instanceof FileNotFoundException) {
- LOGGER.error(e);
- throw new InterruptedException(
- "Insert overwrite may be in progress.Please check " + e.getMessage());
- }
- ext = ext.getCause();
+ if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+ LOGGER.error(e);
+ throw new InterruptedException(
+ "Insert overwrite may be in progress.Please check " + e.getMessage());
}
throw e;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
index d668329..58ec0d5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -24,8 +24,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -47,7 +47,7 @@ public class BadRecordsLogger {
* the status
*/
private static Map<String, String> badRecordEntry =
- new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ new ConcurrentHashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
/**
* File Name
*/
@@ -121,6 +121,9 @@ public class BadRecordsLogger {
public void addBadRecordsToBuilder(Object[] row, String reason)
throws CarbonDataLoadingException {
+ // setting partial success entry since even if bad records are there then load
+ // status should be partial success regardless of bad record logged
+ badRecordEntry.put(taskKey, "Partially");
if (badRecordsLogRedirect || badRecordLoggerEnable) {
StringBuilder logStrings = new StringBuilder();
int size = row.length;
@@ -158,10 +161,6 @@ public class BadRecordsLogger {
}
writeBadRecordsToFile(logStrings);
}
- } else {
- // setting partial success entry since even if bad records are there then load
- // status should be partial success regardless of bad record logged
- badRecordEntry.put(taskKey, "Partially");
}
}
@@ -200,11 +199,6 @@ public class BadRecordsLogger {
} catch (IOException e) {
LOGGER.error("Error While writing bad record log File");
throw new CarbonDataLoadingException("Error While writing bad record log File", e);
- } finally {
- // if the Bad record file is created means it partially success
- // if any entry present with key that means its have bad record for
- // that key
- badRecordEntry.put(taskKey, "Partially");
}
}
@@ -246,9 +240,6 @@ public class BadRecordsLogger {
LOGGER.error("Error While writing bad record csv File");
throw new CarbonDataLoadingException("Error While writing bad record csv File", e);
}
- finally {
- badRecordEntry.put(taskKey, "Partially");
- }
}
public boolean isBadRecordConvertNullDisable() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
index ed35a96..12f6927 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ThreadStatusObserver.java
@@ -39,17 +39,18 @@ public class ThreadStatusObserver {
// should assign the throwable object else the actual cause for failure can be overridden as
// all the running threads will throw interrupted exception on calling shutdownNow and
// will override the throwable object
- if (null == this.throwable) {
- synchronized (lock) {
- if (null == this.throwable) {
- executorService.shutdownNow();
- this.throwable = throwable;
- }
+ synchronized (lock) {
+ if (null == this.throwable) {
+ executorService.shutdownNow();
+ this.throwable = throwable;
}
}
}
public Throwable getThrowable() {
- return throwable;
+
+ synchronized (lock) {
+ return throwable;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 80887c1..cb72f54 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -164,7 +164,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
LOGGER.error(e);
this.threadStatusObserver.notifyFailed(e);
} finally {
- sortDataRows.finishThread();
+ synchronized (sortDataRows) {
+ sortDataRows.finishThread();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 1c6ce8d..fb0bcc3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -109,7 +109,6 @@ public class UnsafeSortDataRows {
// observer of writing file in thread
this.threadStatusObserver = new ThreadStatusObserver();
this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
- this.inMemoryChunkSize = inMemoryChunkSize;
this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L;
enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT,
@@ -121,7 +120,7 @@ public class UnsafeSortDataRows {
// in sort memory size.
this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2;
} else {
- this.maxSizeAllowed = this.maxSizeAllowed * 1024 * 1024;
+ this.maxSizeAllowed = this.maxSizeAllowed * 1024L * 1024L;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
index 01e7649..a65de16 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.AbstractQueue;
+import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -215,7 +216,11 @@ public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> {
* @throws CarbonSortKeyAndGroupByException
*/
private UnsafeCarbonRowForMerge next() throws CarbonSortKeyAndGroupByException {
- return getSortedRecordFromMemory();
+ if (hasNext()) {
+ return getSortedRecordFromMemory();
+ } else {
+ throw new NoSuchElementException("No more elements to return");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 22673ff..c5b215e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.AbstractQueue;
+import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
@@ -238,7 +239,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
* @throws CarbonSortKeyAndGroupByException
*/
private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
- return getSortedRecordFromFile();
+ if (hasNext()) {
+ return getSortedRecordFromFile();
+ } else {
+ throw new NoSuchElementException("No more elements to return");
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
index 0c692c7..8d2c52a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateMerger.java
@@ -38,6 +38,8 @@ import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.commons.collections.list.SynchronizedList;
+
/**
* It does mergesort intermediate files to big file.
*/
@@ -76,7 +78,8 @@ public class UnsafeIntermediateMerger {
this.mergedPages = new ArrayList<>();
this.executorService = Executors.newFixedThreadPool(parameters.getNumberOfCores(),
new CarbonThreadFactory("UnsafeIntermediatePool:" + parameters.getTableName()));
- this.procFiles = new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ this.procFiles =
+ SynchronizedList.decorate(new ArrayList<File>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
this.mergerTask = new ArrayList<>();
Integer spillPercentage;
@@ -111,15 +114,18 @@ public class UnsafeIntermediateMerger {
}
public void startFileMergingIfPossible() {
- File[] fileList;
- if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
- synchronized (lockObject) {
+ File[] fileList = null;
+ synchronized (lockObject) {
+ if (procFiles.size() >= parameters.getNumberOfIntermediateFileToBeMerged()) {
fileList = procFiles.toArray(new File[procFiles.size()]);
this.procFiles = new ArrayList<File>();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER
+ .debug("Submitting request for intermediate merging no of files: " + fileList.length);
+ }
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
- }
+ }
+ if (null != fileList) {
startIntermediateMerging(fileList);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 073d13b..6defeb7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import org.apache.carbondata.common.CarbonIterator;
@@ -195,8 +196,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
* @return sorted row
*/
public Object[] next() {
- IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
- return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+ if (hasNext()) {
+ IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+ return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+ } else {
+ throw new NoSuchElementException("No more elements to return");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index 4078a13..d0e78fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -96,6 +96,10 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
// to be launched.
int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+ if (parallelThreadNumber <= 0) {
+ parallelThreadNumber = 1;
+ }
+
List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
for (int i = 0; i < parallelThreadNumber; i++) {
iterators[i] = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 1744675..81031de 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1108,23 +1108,24 @@ public final class CarbonDataMergerUtil {
CarbonFile[] deleteDeltaFiles =
segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+ if (null != deleteDeltaFiles) {
+ // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
+ // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
+ // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
+ // Spill Over Blocks will choose files with unique taskID.
+ for (CarbonFile blocks : deleteDeltaFiles) {
+ // Get Task ID and the Timestamp from the Block name for e.g.
+ // part-0-3-1481084721319.carbondata => "3-1481084721319"
+ String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+ String timestamp =
+ CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+ String taskAndTimeStamp = task + "-" + timestamp;
+ uniqueBlocks.add(taskAndTimeStamp);
+ }
- // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
- // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
- // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
- // Spill Over Blocks will choose files with unique taskID.
- for (CarbonFile blocks : deleteDeltaFiles) {
- // Get Task ID and the Timestamp from the Block name for e.g.
- // part-0-3-1481084721319.carbondata => "3-1481084721319"
- String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
- String timestamp =
- CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
- String taskAndTimeStamp = task + "-" + timestamp;
- uniqueBlocks.add(taskAndTimeStamp);
- }
-
- if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
- return true;
+ if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+ return true;
+ }
}
}
return false;
@@ -1152,7 +1153,7 @@ public final class CarbonDataMergerUtil {
CarbonFile[] deleteDeltaFiles =
segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
- if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
+ if (null != deleteDeltaFiles && (deleteDeltaFiles.length > numberDeltaFilesThreshold)) {
blockLists.add(seg.getSegmentNo() + "/" + blockName);
}
}
@@ -1200,31 +1201,34 @@ public final class CarbonDataMergerUtil {
String destFileName =
blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
- String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
- + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
-
- List<String> deleteFilePathList = new ArrayList<String>();
- for (CarbonFile cFile : deleteDeltaFiles) {
- deleteFilePathList.add(cFile.getCanonicalPath());
- }
+ List<String> deleteFilePathList = new ArrayList<>();
+ if (null != deleteDeltaFiles && deleteDeltaFiles.length > 0 && null != deleteDeltaFiles[0]
+ .getParentFile()) {
+ String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
+ + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
+
+ for (CarbonFile cFile : deleteDeltaFiles) {
+ deleteFilePathList.add(cFile.getCanonicalPath());
+ }
- CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
- blockDetails.setBlockName(blockName);
- blockDetails.setSegmentName(seg);
- blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
- blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
+ CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
+ blockDetails.setBlockName(blockName);
+ blockDetails.setSegmentName(seg);
+ blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
+ blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
- try {
- if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
- blockDetails.setCompactionStatus(true);
- } else {
- blockDetails.setCompactionStatus(false);
+ try {
+ if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
+ blockDetails.setCompactionStatus(true);
+ } else {
+ blockDetails.setCompactionStatus(false);
+ }
+ resultList.add(blockDetails);
+ } catch (IOException e) {
+ LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
+ + fullBlockFilePath);
+ throw new IOException();
}
- resultList.add(blockDetails);
- } catch (IOException e) {
- LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
- + fullBlockFilePath);
- throw new IOException();
}
return resultList;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 442f1c5..9a3258e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -138,21 +138,23 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
// if record holder is not empty then iterator the slice holder from
// heap
iterator = this.recordHolderHeap.poll();
- while (true) {
- Object[] convertedRow = iterator.next();
- if (null == convertedRow) {
- iterator.close();
- break;
- }
- // do it only once
- if (!isDataPresent) {
- dataHandler.initialise();
- isDataPresent = true;
- }
- addRow(convertedRow);
- // check if leaf contains no record
- if (!iterator.hasNext()) {
- break;
+ if (null != iterator) {
+ while (true) {
+ Object[] convertedRow = iterator.next();
+ if (null == convertedRow) {
+ iterator.close();
+ break;
+ }
+ // do it only once
+ if (!isDataPresent) {
+ dataHandler.initialise();
+ isDataPresent = true;
+ }
+ addRow(convertedRow);
+ // check if leaf contains no record
+ if (!iterator.hasNext()) {
+ break;
+ }
}
}
if (isDataPresent)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
deleted file mode 100644
index b6f12a5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.partition.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.partition.DataPartitioner;
-import org.apache.carbondata.processing.partition.Partition;
-
-
-public final class QueryPartitionHelper {
- private static QueryPartitionHelper instance = new QueryPartitionHelper();
- private Map<String, DataPartitioner> partitionerMap =
- new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- private Map<String, DefaultLoadBalancer> loadBalancerMap =
- new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- private QueryPartitionHelper() {
-
- }
-
- public static QueryPartitionHelper getInstance() {
- return instance;
- }
-
- /**
- * Get partitions applicable for query based on filters applied in query
- */
- public List<Partition> getPartitionsForQuery(String databaseName, String tableName) {
- String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
-
- DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
- return dataPartitioner.getPartitions();
- }
-
- public List<Partition> getAllPartitions(String databaseName, String tableName) {
- String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
-
- DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
- return dataPartitioner.getAllPartitions();
- }
-
- /**
- * Get the node name where the partition is assigned to.
- */
- public String getLocation(Partition partition, String databaseName, String tableName) {
- String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName);
-
- DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
- return loadBalancer.getNodeForPartitions(partition);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index c06819c..364515c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.AbstractQueue;
+import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
@@ -237,7 +238,11 @@ public class IntermediateFileMerger implements Callable<Void> {
* @throws CarbonSortKeyAndGroupByException
*/
private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
- return getSortedRecordFromFile();
+ if (hasNext()) {
+ return getSortedRecordFromFile();
+ } else {
+ throw new NoSuchElementException("No more elements to return");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 1a839a2..09c1920 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -23,6 +23,7 @@ import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -233,8 +234,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
* @throws CarbonSortKeyAndGroupByException
*/
public Object[] next() {
- IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
- return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+ if (hasNext()) {
+ IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+ return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+ } else {
+ throw new NoSuchElementException("No more elements to return");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 8115f97..9d0c933 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -43,7 +43,6 @@ import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonMergerUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -56,8 +55,6 @@ import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.commons.lang3.ArrayUtils;
-
public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
private static final LogService LOGGER =
@@ -168,13 +165,13 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
this.blockSizeThreshold =
fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
- LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " +
- blockSizeThreshold);
+ LOGGER
+ .info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + blockSizeThreshold);
// whether to directly write fact data to HDFS
- String directlyWriteData2Hdfs = propInstance.getProperty(
- CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
- CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
+ String directlyWriteData2Hdfs = propInstance
+ .getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+ CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
this.enableDirectlyWriteData2Hdfs = "TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
if (enableDirectlyWriteData2Hdfs) {
@@ -189,22 +186,9 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
// in case of compaction we will pass the cardinality.
this.localCardinality = this.model.getColCardinality();
- //TODO: We should delete the levelmetadata file after reading here.
- // so only data loading flow will need to read from cardinality file.
- if (null == this.localCardinality) {
- this.localCardinality = CarbonMergerUtil
- .getCardinalityFromLevelMetadata(this.model.getStoreLocation(),
- this.model.getTableName());
- List<Integer> cardinalityList = new ArrayList<Integer>();
- thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
- this.model.getWrapperColumnSchema());
- localCardinality =
- ArrayUtils.toPrimitive(cardinalityList.toArray(new Integer[cardinalityList.size()]));
- } else { // for compaction case
- List<Integer> cardinalityList = new ArrayList<Integer>();
- thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
- this.model.getWrapperColumnSchema());
- }
+ List<Integer> cardinalityList = new ArrayList<Integer>();
+ thriftColumnSchemaList = getColumnSchemaListAndCardinality(cardinalityList, localCardinality,
+ this.model.getWrapperColumnSchema());
this.numberCompressor = new NumberCompressor(Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
index 4abdf3c..8e23489 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -17,19 +17,8 @@
package org.apache.carbondata.processing.util;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.scan.model.QueryProjection;
-import org.apache.carbondata.processing.partition.Partition;
-import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
-import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.processing.partition.impl.QueryPartitionHelper;
-import org.apache.carbondata.processing.splits.TableSplit;
import org.apache.commons.lang3.StringUtils;
@@ -43,58 +32,6 @@ public class CarbonQueryUtil {
}
/**
- * It creates the one split for each region server.
- */
- public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
- QueryProjection queryPlan) {
-
- //Just create splits depends on locations of region servers
- List<Partition> allPartitions = null;
- if (queryPlan == null) {
- allPartitions =
- QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
- } else {
- allPartitions =
- QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName);
- }
- TableSplit[] splits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < splits.length; i++) {
- splits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = QueryPartitionHelper.getInstance()
- .getLocation(partition, databaseName, tableName);
- locations.add(location);
- splits[i].setPartition(partition);
- splits[i].setLocations(locations);
- }
-
- return splits;
- }
-
- /**
- * It creates the one split for each region server.
- */
- public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) {
-
- //Just create splits depends on locations of region servers
- DefaultLoadBalancer loadBalancer = null;
- List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
- loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
- TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < tblSplits.length; i++) {
- tblSplits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = loadBalancer.getNodeForPartitions(partition);
- locations.add(location);
- tblSplits[i].setPartition(partition);
- tblSplits[i].setLocations(locations);
- }
- return tblSplits;
- }
-
- /**
* split sourcePath by comma
*/
public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
@@ -104,21 +41,4 @@ public class CarbonQueryUtil {
Collections.addAll(partitionsFiles, files);
}
}
-
- private static List<Partition> getAllFilesForDataLoad(String sourcePath) {
- List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
- List<Partition> partitionList =
- new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
- partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
- partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
- for (int i = 0; i < files.size(); i++) {
- partitionFiles.get(0).add(files.get(i));
- }
- return partitionList;
- }
-
}