You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 07:58:14 UTC
[12/35] carbondata git commit: [REBASE] Solve conflict after rebasing
master
[REBASE] Solve conflict after rebasing master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/03a77694
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/03a77694
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/03a77694
Branch: refs/heads/carbonstore-rebase
Commit: 03a77694404ca13dc57f260051dc61d3c58964ad
Parents: 129f02e
Author: Jacky Li <ja...@qq.com>
Authored: Thu Feb 1 00:25:31 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Feb 9 15:55:52 2018 +0800
----------------------------------------------------------------------
.../hadoop/util/CarbonInputFormatUtil.java | 20 +++++++++++++++++++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 21 ++------------------
.../org/apache/spark/sql/CarbonSession.scala | 5 ++---
3 files changed, 24 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/03a77694/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 514428b..056c27b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -22,6 +22,8 @@ import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Locale;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -39,6 +41,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
@@ -159,4 +162,21 @@ public class CarbonInputFormatUtil {
String jobtrackerID = createJobTrackerID(date);
return new JobID(jobtrackerID, batch);
}
+
+ public static void setS3Configurations(Configuration hadoopConf) {
+ FileFactory.getConfiguration()
+ .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""));
+ FileFactory.getConfiguration()
+ .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""));
+ FileFactory.getConfiguration()
+ .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""));
+ FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""));
+ FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""));
+ FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""));
+ FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
+ hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""));
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/03a77694/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 917fc88..e17824f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,10 +41,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -371,7 +371,7 @@ class NewDataFrameLoaderRDD[K, V](
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val hadoopConf = getConf
- setS3Configurations(hadoopConf)
+ CarbonInputFormatUtil.setS3Configurations(hadoopConf)
val iter = new Iterator[(K, V)] {
val loadMetadataDetails = new LoadMetadataDetails()
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
@@ -441,23 +441,6 @@ class NewDataFrameLoaderRDD[K, V](
iter
}
override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
-
- private def setS3Configurations(hadoopConf: Configuration): Unit = {
- FileFactory.getConfiguration
- .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""))
- FileFactory.getConfiguration
- .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""))
- FileFactory.getConfiguration
- .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""))
- FileFactory.getConfiguration.set(CarbonCommonConstants.S3_ACCESS_KEY,
- hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""))
- FileFactory.getConfiguration.set(CarbonCommonConstants.S3_SECRET_KEY,
- hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""))
- FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_ACCESS_KEY,
- hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""))
- FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_SECRET_KEY,
- hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""))
- }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/03a77694/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index ded8f35..28471f0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -21,7 +21,6 @@ import java.io.File
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
@@ -31,8 +30,8 @@ import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.util.{CarbonReflectionUtils, Utils}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
/**
* Session implementation for {org.apache.spark.sql.SparkSession}
@@ -154,7 +153,7 @@ object CarbonSession {
sparkConf.setAppName(randomAppName)
}
val sc = SparkContext.getOrCreate(sparkConf)
- setS3Configurations(sc)
+ CarbonInputFormatUtil.setS3Configurations(sc.hadoopConfiguration)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
options.foreach { case (k, v) => sc.conf.set(k, v) }