You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/11 12:16:16 UTC
[2/3] carbondata git commit: [CARBONDATA-2909] Multi user support for
SDK on S3
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index ff2ffdd..809d68b 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -53,8 +54,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
try {
dataMap.init(new DataMapModel(
- DataMapWriter.getDefaultDataMapPath(
- tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName)));
+ DataMapWriter.getDefaultDataMapPath(tableIdentifier.getTablePath(),
+ segment.getSegmentNo(), dataMapName), segment.getConfiguration()));
} catch (MemoryException e) {
LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
return lstDataMap;
@@ -73,7 +74,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath();
try {
- dataMap.init(new DataMapModel(indexPath));
+ dataMap.init(new DataMapModel(indexPath, FileFactory.getConfiguration()));
} catch (MemoryException e) {
LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage()));
return lstDataMap;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index c80cc75..9c1e18d 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -383,11 +383,12 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
* this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment
* Build a {@link CarbonWriter}, which accepts row in CSV format object
* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+* @param configuration hadoop configuration object.
* @return CSVCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
```
```
@@ -395,12 +396,13 @@ public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema
* Can use this writer in multi-thread instance.
* Build a {@link CarbonWriter}, which accepts row in CSV format
* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-* @param numOfThreads number of threads() in which .write will be called.
+* @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuration object
* @return CSVCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, Configuration configuration)
throws IOException, InvalidLoadOptionException;
```
@@ -410,11 +412,12 @@ public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfT
* this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment
* Build a {@link CarbonWriter}, which accepts Avro format object
* @param avroSchema avro Schema object {org.apache.avro.Schema}
+* @param configuration hadoop configuration object
* @return AvroCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
```
```
@@ -423,11 +426,13 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw
* Build a {@link CarbonWriter}, which accepts Avro object
* @param avroSchema avro Schema object {org.apache.avro.Schema}
* @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuration object
* @return AvroCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short
+numOfThreads, Configuration configuration)
throws IOException, InvalidLoadOptionException
```
@@ -437,11 +442,12 @@ public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avr
* this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment
* Build a {@link CarbonWriter}, which accepts Json object
* @param carbonSchema carbon Schema object
+* @param configuration hadoop configuration object
* @return JsonCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
+public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration);
```
```
@@ -450,11 +456,12 @@ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
* Build a {@link CarbonWriter}, which accepts Json object
* @param carbonSchema carbon Schema object
* @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuraiton object.
* @return JsonCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
+public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, Configuration configuration)
```
### Class org.apache.carbondata.sdk.file.CarbonWriter
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index ada1a8c..4eec4bf 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -23,6 +23,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.sdk.file.CarbonReader;
@@ -55,7 +56,7 @@ public class CarbonReaderExample {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
- .buildWriterForCSVInput(new Schema(fields));
+ .buildWriterForCSVInput(new Schema(fields), new Configuration(false));
for (int i = 0; i < 10; i++) {
String[] row2 = new String[]{
@@ -98,7 +99,7 @@ public class CarbonReaderExample {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(strings)
- .build();
+ .build(new Configuration(false));
System.out.println("\nData:");
long day = 24L * 3600 * 1000;
@@ -116,7 +117,7 @@ public class CarbonReaderExample {
// Read data
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
- .build();
+ .build(new Configuration(false));
System.out.println("\nData:");
i = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 52d51b5..3abc342 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.sdk.file.*;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Example for testing CarbonWriter on S3
*/
@@ -56,7 +58,7 @@ public class SDKS3Example {
.setEndPoint(args[2])
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), new Configuration(false));
for (int i = 0; i < num; i++) {
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
@@ -75,7 +77,7 @@ public class SDKS3Example {
.setAccessKey(args[0])
.setSecretKey(args[1])
.setEndPoint(args[2])
- .build();
+ .build(new Configuration(false));
System.out.println("\nData:");
int i = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
index a011d80..86bf854 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.examples
import java.io.File
import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.examples.util.ExampleUtils
@@ -36,7 +37,7 @@ object DirectSQLExample {
def buildTestData(
path: String,
num: Int = 3,
- persistSchema: Boolean = false): Any = {
+ persistSchema: Boolean = false, sparkSession: SparkSession): Any = {
// getCanonicalPath gives path with \, but the code expects /.
val writerPath = path.replace("\\", "/");
@@ -56,7 +57,8 @@ object DirectSQLExample {
if (persistSchema) {
builder.persistSchemaFile(true)
}
- val writer = builder.buildWriterForCSVInput(new Schema(fields))
+ val writer = builder
+ .buildWriterForCSVInput(new Schema(fields), sparkSession.sparkContext.hadoopConfiguration)
var i = 0
while (i < num) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -82,7 +84,7 @@ object DirectSQLExample {
import carbonSession._
// 1. generate data file
cleanTestData(path)
- buildTestData(path, 20)
+ buildTestData(path, 20, sparkSession = carbonSession)
val readPath = path + "Fact/Part0/Segment_null"
println("Running SQL on carbon files directly")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 1795960..c5c9710 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.examples
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
@@ -52,12 +53,12 @@ object S3UsingSDKExample {
builder.outputPath(writerPath).isTransactionalTable(true)
.uniqueIdentifier(
System.currentTimeMillis)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
} else {
builder.outputPath(writerPath).isTransactionalTable(true)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
}
var i = 0
var row = num
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9e5edc1..fcfb346 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -113,11 +113,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
ReadCommittedScope readCommittedScope = null;
if (carbonTable.isTransactionalTable()) {
readCommittedScope = new LatestFilesReadCommittedScope(
- identifier.getTablePath() + "/Fact/Part0/Segment_null/");
+ identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
} else {
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
- readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+ readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(), job
+ .getConfiguration());
+ } else {
+ readCommittedScope.setConfiguration(job.getConfiguration());
}
}
// this will be null in case of corrupt schema file.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index e5e3165..eb9ff7c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -188,8 +188,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
Set<Segment> segmentSet = new HashSet<>(
- new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
- .getValidAndInvalidSegments().getValidSegments());
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+ context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
if (updateTime != null) {
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
segmentDeleteList);
@@ -223,8 +223,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
if (partitionSpecs != null && partitionSpecs.size() > 0) {
List<Segment> validSegments =
- new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments()
- .getValidSegments();
+ new SegmentStatusManager(table.getAbsoluteTableIdentifier())
+ .getValidAndInvalidSegments().getValidSegments();
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> tobeUpdatedSegs = new ArrayList<>();
List<String> tobeDeletedSegs = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ba6e043..ba3accf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -143,7 +143,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<Segment> streamSegments = null;
// get all valid segments and set them into the configuration
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
+ readCommittedScope.getConfiguration());
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
.getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
@@ -583,7 +584,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
- new SegmentStatusManager(identifier)
+ new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
.getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
Map<String, Long> blockRowCountMapping = new HashMap<>();
Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
@@ -649,11 +650,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
if (readCommittedScope == null) {
ReadCommittedScope readCommittedScope;
if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
- readCommittedScope = new TableStatusReadCommittedScope(identifier);
+ readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration());
} else {
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
- readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+ readCommittedScope =
+ new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration());
}
}
this.readCommittedScope = readCommittedScope;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 99d8532..2d4f370 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -270,6 +270,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
throw new RuntimeException(e);
+ } finally {
+ ThreadLocalSessionInfo.unsetAll();
}
}
});
@@ -444,6 +446,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
} finally {
executorService.shutdownNow();
dataLoadExecutor.close();
+ ThreadLocalSessionInfo.unsetAll();
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index de0d731..40a0a62 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -112,7 +112,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
} else {
if (options != null) {
builder.outputPath(writerPath)
@@ -120,14 +120,15 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
} else {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext
+ .hadoopConfiguration)
}
}
var i = 0
@@ -544,7 +545,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
try {
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
writer.write(record)
@@ -743,7 +744,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
for (i <- 0 until 5) {
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index a1d4290..63fb2e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -21,6 +21,7 @@ import java.util
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression}
@@ -304,7 +306,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
val resolveFilter: FilterResolverIntf =
CarbonTable.resolveFilter(andExpression, carbonTable.getAbsoluteTableIdentifier)
val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, resolveFilter)
- val segment = new Segment("0")
+ val segment = new Segment("0", new TableStatusReadCommittedScope(carbonTable
+ .getAbsoluteTableIdentifier, new Configuration(false)))
// get the pruned blocklets
val prunedBlocklets = exprWrapper.prune(List(segment).asJava, null)
prunedBlocklets.asScala.foreach { blocklet =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 08daa34..1b181bc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -59,9 +59,11 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
}
var i = 0
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 0b6813f..a03a5eb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -142,7 +142,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
if (options != null) {
builder.outputPath(writerPath)
@@ -150,14 +151,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
}
}
var i = 0
@@ -194,7 +197,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields),
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
@@ -228,7 +232,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
123).withBlockSize(2)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -992,7 +997,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val builder: CarbonWriterBuilder = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options)
- val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+ val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"));
writer.close()
@@ -1117,7 +1123,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
try {
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
writer.write(record)
@@ -2091,7 +2098,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(intercept[RuntimeException] {
val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}.getMessage.toLowerCase.contains("column: name specified in sort columns"))
@@ -2131,7 +2139,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -2169,7 +2178,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder.sortBy(Array("id"))
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -2213,7 +2223,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -2253,7 +2264,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -2299,7 +2311,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -2346,7 +2359,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -2366,7 +2380,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val writer: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
.withTableProperties(options)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
writer.write(Array("carbon", "1"))
writer.write(Array("hydrogen", "10"))
writer.write(Array("boron", "4"))
@@ -2384,7 +2398,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
// write local sort data
val writer1: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
writer1.write(Array("carbon", "1"))
writer1.write(Array("hydrogen", "10"))
writer1.write(Array("boron", "4"))
@@ -2493,7 +2507,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("surname", DataTypes.STRING)
fields(2) = new Field("age", DataTypes.INT)
- val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+ val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < 100) {
{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index ff5c062..17aae1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -98,7 +98,8 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
.outputPath(writerPath).isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis())
.withLoadOptions(options)
- .buildWriterForJsonInput(carbonSchema)
+ .buildWriterForJsonInput(carbonSchema,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(jsonRow)
writer.close()
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index dc13b16..e7fcf95 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -93,7 +93,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -140,7 +140,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, mySchema)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -184,7 +184,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json, mySchema)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -215,7 +215,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -245,7 +245,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -275,7 +275,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -305,7 +305,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -335,7 +335,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -365,7 +365,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val exception1 = intercept[UnsupportedOperationException] {
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -402,7 +402,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -438,7 +438,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -476,7 +476,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -509,7 +509,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -568,7 +568,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -663,7 +663,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -716,7 +716,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -786,7 +786,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
avroRec.put("struct_field_decimal", genericByteArray)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -858,7 +858,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
avroRec.put("dec_fields", genericByteArray)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -905,7 +905,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false)
+ .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -952,7 +953,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false)
+ .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -997,7 +999,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("id", bytes1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1042,7 +1044,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("dec_field", bytes1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1087,7 +1089,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("dec_field", bytes1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1128,7 +1130,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("dec_field", bytes)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1170,7 +1172,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
avroRec.put("dec_field", bytes)
val exception1 = intercept[Exception] {
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
}
@@ -1220,7 +1222,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -1256,7 +1258,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -1306,7 +1308,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 62ba03e..0421ea8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -68,11 +68,13 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true)
.localDictionaryThreshold(2000)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
}
var i = 0
while (i < rows) {
@@ -268,7 +270,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
""".stripMargin
val pschema= org.apache.avro.Schema.parse(mySchema)
val records = testUtil.jsonToAvro(jsonvalue, mySchema)
- val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
+ val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(records)
writer.close()
sql("DROP TABLE IF EXISTS sdkOutputTable")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 4e2197d..a8bdb31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.testsuite.createTable
import java.io.File
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -58,9 +59,11 @@ object TestSparkCarbonFileFormatWithSparkSession {
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+ Configuration(false))
} else {
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+ Configuration(false))
}
var i = 0
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index 39785a3..6e8e79b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -18,9 +18,10 @@
package org.apache.carbondata.spark.testsuite.dataload
import scala.collection.JavaConverters._
-
import java.io.{File, FilenameFilter}
+import org.apache.hadoop.conf.Configuration
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.reader.CarbonIndexFileReader
import org.apache.carbondata.core.util.CarbonProperties
@@ -64,7 +65,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
} else {
val segment = Segment.getSegment("0", carbonTable.getTablePath)
val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
}
for (carbonIndexPath <- carbonIndexPaths) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0c42264..44bc243 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import java.io.{File, FileWriter}
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
@@ -281,7 +282,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
} else {
val segment = Segment.getSegment("0", carbonTable.getTablePath)
val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
assertResult(Math.max(4, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 046a2a6..a4bc6f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -22,12 +22,13 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -71,7 +72,7 @@ class CGDataMapFactory(
val files = file.listFiles()
files.map {f =>
val dataMap: CoarseGrainDataMap = new CGDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
dataMap
}.toList.asJava
}
@@ -83,7 +84,8 @@ class CGDataMapFactory(
override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
val dataMap: CoarseGrainDataMap = new CGDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath, new
+ Configuration(false)))
Seq(dataMap).asJava
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index b13582b..57b3672 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -22,12 +22,14 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -68,7 +70,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
val files = file.listFiles()
files.map { f =>
val dataMap: FineGrainDataMap = new FGDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
dataMap
}.toList.asJava
}
@@ -79,7 +81,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
val dataMap: FineGrainDataMap = new FGDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath, new Configuration(false)))
Seq(dataMap).asJava
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 8ebed1f..edd3e9c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -18,9 +18,9 @@
package org.apache.carbondata.spark.testsuite.datamap
import scala.collection.JavaConverters._
-
import java.io.{File, FilenameFilter}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -337,7 +337,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
} else {
val segment = Segment.getSegment("0", path)
val store = new SegmentFileStore(path, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
assertResult(true)(size > 0)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 133454a..f4c725e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.partition
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
@@ -78,7 +79,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
} else {
val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index e3e8e68..9a0080c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -21,6 +21,7 @@ import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors}
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.optimizer.CarbonFilters
@@ -352,7 +353,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
val details = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
store.getIndexFiles
assert(store.getIndexFiles.size() == 10)
CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index af05613..3a650ec 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -117,7 +117,7 @@ class CarbonFileIndex(
}
CarbonInputFormat.setReadCommittedScope(
hadoopConf,
- new LatestFilesReadCommittedScope(indexFiles))
+ new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
filter match {
case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
case None => None
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index a5e1b39..62d9903 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -78,9 +78,10 @@ class SparkCarbonFileFormat extends FileFormat
override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
+ val conf = sparkSession.sessionState.newHadoopConf()
val tablePath = options.get("path") match {
case Some(path) =>
- FileFactory.checkAndAppendDefaultFs(path, sparkSession.sparkContext.hadoopConfiguration)
+ FileFactory.checkAndAppendDefaultFs(path, conf)
case _ if files.nonEmpty =>
FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
case _ =>
@@ -89,7 +90,8 @@ class SparkCarbonFileFormat extends FileFormat
if (options.get(CarbonCommonConstants.SORT_COLUMNS).isDefined) {
throw new UnsupportedOperationException("Cannot use sort columns during infer schema")
}
- val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+ val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""),
+ false, conf)
val table = CarbonTable.buildFromTableInfo(tableInfo)
var schema = new StructType
val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col =>
@@ -395,7 +397,7 @@ class SparkCarbonFileFormat extends FileFormat
vectorizedReader
} else {
val reader = new CarbonRecordReader(model,
- new SparkUnsafeRowReadSuport(requiredSchema), null)
+ new SparkUnsafeRowReadSuport(requiredSchema), broadcastedHadoopConf.value.value)
reader.initialize(split, hadoopAttemptContext)
reader
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 66c0224..825cdec 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -868,7 +868,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 26f67f8..43f04b8 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -22,8 +22,8 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.util.SparkUtil
+import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
@@ -70,9 +70,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+ .sparkContext.hadoopConfiguration)
} else {
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+ .sparkContext.hadoopConfiguration)
}
var i = 0
@@ -333,7 +335,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), spark.sessionState.newHadoopConf())
for (i <- 0 until 3) {
// write a varchar with 75,000 length
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
@@ -348,15 +350,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
|USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
.stripMargin)
- } else if (spark.sparkContext.version.startsWith("2.2")) {
+ } else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
|OPTIONS("long_String_columns"="address") LOCATION
|'$writerPath' """.stripMargin)
- } else {
- // TODO. spark2.3 ?
- assert(false)
}
assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
@@ -371,14 +370,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
|'$writerPath', "long_String_columns" "address") """.stripMargin)
- } else if (spark.sparkContext.version.startsWith("2.2")) {
+ } else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
|("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
- } else {
- // TODO. spark2.3 ?
- assert(false)
}
assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0fd4e34..57887a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -436,7 +436,7 @@ object CarbonDataRDDFactory {
res.foreach { resultOfSeg =>
resultSize = resultSize + resultOfSeg.size
resultOfSeg.foreach { resultOfBlock =>
- segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
}
}
val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index b77632d..4921b33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -159,14 +159,13 @@ object DeleteExecution {
resultOfBlock => {
if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
blockUpdateDetailsList.add(resultOfBlock._2._1)
- segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null))
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
// if this block is invalid then decrement block count in map.
if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
blockMappingVO.getSegmentNumberOfBlockMapping)
}
- }
- else {
+ } else {
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index e3da86d..2951283 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1202,7 +1202,7 @@ public final class CarbonDataMergerUtil {
segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
CarbonFile[] deleteDeltaFiles =
- segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName);
+ segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg), blockName);
String destFileName =
blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 19353d1..f6cc485 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -326,7 +326,7 @@ public final class CarbonLoaderUtil {
for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
// if the segments is in the list of marked for delete then update the status.
- if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
+ if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) {
detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
} else if (segmentFilesTobeUpdated
.contains(Segment.toSegment(detail.getLoadName(), null))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index dd70cc9..a183197 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -74,8 +74,7 @@ public class AvroCarbonWriter extends CarbonWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonTable.class.getName());
- AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
- Configuration hadoopConf = new Configuration();
+ AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
CarbonTableOutputFormat format = new CarbonTableOutputFormat();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index 627e060..a8899a7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -46,8 +46,7 @@ class CSVCarbonWriter extends CarbonWriter {
private TaskAttemptContext context;
private ObjectArrayWritable writable;
- CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
- Configuration hadoopConf = new Configuration();
+ CSVCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
CarbonTableOutputFormat format = new CarbonTableOutputFormat();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);