You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 16:39:11 UTC

[25/50] carbondata git commit: [REBASE] Solve conflict after rebasing master

[REBASE] Solve conflict after rebasing master


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e5acea97
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e5acea97
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e5acea97

Branch: refs/heads/carbonstore-rebase4
Commit: e5acea971ab7bdf75634ccd292b9438a55ef461e
Parents: 6ea4cab
Author: Jacky Li <ja...@qq.com>
Authored: Thu Feb 1 00:25:31 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 16:59:47 2018 +0800

----------------------------------------------------------------------
 .../hadoop/util/CarbonInputFormatUtil.java      | 20 +++++++++++++++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 21 ++------------------
 .../org/apache/spark/sql/CarbonSession.scala    |  5 ++---
 3 files changed, 24 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5acea97/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 514428b..056c27b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -22,6 +22,8 @@ import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Locale;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -39,6 +41,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -159,4 +162,21 @@ public class CarbonInputFormatUtil {
     String jobtrackerID = createJobTrackerID(date);
     return new JobID(jobtrackerID, batch);
   }
+
+  public static void setS3Configurations(Configuration hadoopConf) {
+    FileFactory.getConfiguration()
+        .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""));
+    FileFactory.getConfiguration()
+        .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""));
+    FileFactory.getConfiguration()
+        .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""));
+    FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY,
+        hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""));
+    FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY,
+        hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""));
+    FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY,
+        hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""));
+    FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY,
+        hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""));
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5acea97/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 917fc88..e17824f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,10 +41,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -371,7 +371,7 @@ class NewDataFrameLoaderRDD[K, V](
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val hadoopConf = getConf
-    setS3Configurations(hadoopConf)
+    CarbonInputFormatUtil.setS3Configurations(hadoopConf)
     val iter = new Iterator[(K, V)] {
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
@@ -441,23 +441,6 @@ class NewDataFrameLoaderRDD[K, V](
     iter
   }
   override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
-
-  private def setS3Configurations(hadoopConf: Configuration): Unit = {
-    FileFactory.getConfiguration
-      .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", ""))
-    FileFactory.getConfiguration
-      .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", ""))
-    FileFactory.getConfiguration
-      .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", ""))
-    FileFactory.getConfiguration.set(CarbonCommonConstants.S3_ACCESS_KEY,
-      hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, ""))
-    FileFactory.getConfiguration.set(CarbonCommonConstants.S3_SECRET_KEY,
-      hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, ""))
-    FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_ACCESS_KEY,
-      hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, ""))
-    FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_SECRET_KEY,
-     hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, ""))
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e5acea97/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 935b0a6..bf958f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -21,7 +21,6 @@ import java.io.File
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
@@ -31,8 +30,8 @@ import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.util.{CarbonReflectionUtils, Utils}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 /**
  * Session implementation for {org.apache.spark.sql.SparkSession}
@@ -154,7 +153,7 @@ object CarbonSession {
             sparkConf.setAppName(randomAppName)
           }
           val sc = SparkContext.getOrCreate(sparkConf)
-          setS3Configurations(sc)
+          CarbonInputFormatUtil.setS3Configurations(sc.hadoopConfiguration)
           // maybe this is an existing SparkContext, update its SparkConf which maybe used
           // by SparkSession
           options.foreach { case (k, v) => sc.conf.set(k, v) }