You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/09/11 12:16:15 UTC
[1/3] carbondata git commit: [CARBONDATA-2909] Multi user support for
SDK on S3
Repository: carbondata
Updated Branches:
refs/heads/master 7c827c0a9 -> 8f1a029b9
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 4859dd2..58bf3ab 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -179,7 +179,8 @@ public class CarbonReaderBuilder {
* @throws IOException
* @throws InterruptedException
*/
- public <T> CarbonReader<T> build() throws IOException, InterruptedException {
+ public <T> CarbonReader<T> build(Configuration configuration)
+ throws IOException, InterruptedException {
// DB name is not applicable for SDK reader as, table will be never registered.
CarbonTable table;
if (isTransactionalTable) {
@@ -193,7 +194,7 @@ public class CarbonReaderBuilder {
}
}
final CarbonFileInputFormat format = new CarbonFileInputFormat();
- final Job job = new Job(new Configuration());
+ final Job job = new Job(configuration);
format.setTableInfo(job.getConfiguration(), table.getTableInfo());
format.setTablePath(job.getConfiguration(), table.getTablePath());
format.setTableName(job.getConfiguration(), table.getTableName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 89e69fb..28a0dde 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -45,14 +45,13 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
/**
@@ -74,10 +73,6 @@ public class CarbonWriterBuilder {
private int localDictionaryThreshold;
private boolean isLocalDictionaryEnabled;
- public CarbonWriterBuilder() {
- ThreadLocalSessionInfo.setCarbonSessionInfo(new CarbonSessionInfo());
- }
-
/**
* Sets the output path of the writer builder
* @param path is the absolute path where output files are written
@@ -398,13 +393,13 @@ public class CarbonWriterBuilder {
* @throws IOException
* @throws InvalidLoadOptionException
*/
- public CarbonWriter buildWriterForCSVInput(Schema schema)
+ public CarbonWriter buildWriterForCSVInput(Schema schema, Configuration configuration)
throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
this.schema = schema;
CarbonLoadModel loadModel = buildLoadModel(schema);
- return new CSVCarbonWriter(loadModel);
+ return new CSVCarbonWriter(loadModel, configuration);
}
/**
@@ -416,8 +411,8 @@ public class CarbonWriterBuilder {
* @throws IOException
* @throws InvalidLoadOptionException
*/
- public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
- throws IOException, InvalidLoadOptionException {
+ public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads,
+ Configuration configuration) throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
Objects.requireNonNull(path, "path should not be null");
@@ -427,7 +422,7 @@ public class CarbonWriterBuilder {
}
CarbonLoadModel loadModel = buildLoadModel(schema);
loadModel.setSdkWriterCores(numOfThreads);
- return new CSVCarbonWriter(loadModel);
+ return new CSVCarbonWriter(loadModel, configuration);
}
/**
@@ -439,8 +434,8 @@ public class CarbonWriterBuilder {
* @throws IOException
* @throws InvalidLoadOptionException
*/
- public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema)
- throws IOException, InvalidLoadOptionException {
+ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema,
+ Configuration configuration) throws IOException, InvalidLoadOptionException {
this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
@@ -450,7 +445,7 @@ public class CarbonWriterBuilder {
// removed from the load. LoadWithoutConverter flag is going to point to the Loader Builder
// which will skip Conversion Step.
loadModel.setLoadWithoutConverterStep(true);
- return new AvroCarbonWriter(loadModel);
+ return new AvroCarbonWriter(loadModel, configuration);
}
/**
@@ -462,7 +457,7 @@ public class CarbonWriterBuilder {
* @throws InvalidLoadOptionException
*/
public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema,
- short numOfThreads)
+ short numOfThreads, Configuration configuration)
throws IOException, InvalidLoadOptionException {
this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
Objects.requireNonNull(schema, "schema should not be null");
@@ -478,7 +473,7 @@ public class CarbonWriterBuilder {
// which will skip Conversion Step.
loadModel.setLoadWithoutConverterStep(true);
loadModel.setSdkWriterCores(numOfThreads);
- return new AvroCarbonWriter(loadModel);
+ return new AvroCarbonWriter(loadModel, configuration);
}
/**
@@ -490,14 +485,14 @@ public class CarbonWriterBuilder {
* @throws IOException
* @throws InvalidLoadOptionException
*/
- public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema)
+ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration)
throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(carbonSchema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
this.schema = carbonSchema;
CarbonLoadModel loadModel = buildLoadModel(carbonSchema);
loadModel.setJsonFileLoad(true);
- return new JsonCarbonWriter(loadModel);
+ return new JsonCarbonWriter(loadModel, configuration);
}
/**
@@ -510,11 +505,10 @@ public class CarbonWriterBuilder {
* @throws IOException
* @throws InvalidLoadOptionException
*/
- public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
- throws IOException, InvalidLoadOptionException {
+ public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads,
+ Configuration configuration) throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(carbonSchema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
- Objects.requireNonNull(numOfThreads, "numOfThreads should not be null");
if (numOfThreads <= 0) {
throw new IllegalArgumentException(" numOfThreads must be greater than 0");
}
@@ -522,7 +516,7 @@ public class CarbonWriterBuilder {
CarbonLoadModel loadModel = buildLoadModel(schema);
loadModel.setJsonFileLoad(true);
loadModel.setSdkWriterCores(numOfThreads);
- return new JsonCarbonWriter(loadModel);
+ return new JsonCarbonWriter(loadModel, configuration);
}
private void setCsvHeader(CarbonLoadModel model) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
index b6e7ad5..5f65539 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java
@@ -23,7 +23,6 @@ import java.util.Random;
import java.util.UUID;
import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -47,15 +46,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
private TaskAttemptContext context;
private ObjectArrayWritable writable;
- JsonCarbonWriter(CarbonLoadModel loadModel) throws IOException {
- Configuration OutputHadoopConf = FileFactory.getConfiguration();
- CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel);
+ JsonCarbonWriter(CarbonLoadModel loadModel, Configuration configuration) throws IOException {
+ CarbonTableOutputFormat.setLoadModel(configuration, loadModel);
CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
Random random = new Random();
TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
- TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID);
+ TaskAttemptContextImpl context = new TaskAttemptContextImpl(configuration, attemptID);
this.recordWriter = outputFormat.getRecordWriter(context);
this.context = context;
this.writable = new ObjectArrayWritable();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 4320edc..8f53125 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -83,7 +83,7 @@ public class AvroCarbonWriterTest {
GenericData.Record record = TestUtil.jsonToAvro(json, avroSchema);
try {
CarbonWriter writer = CarbonWriter.builder().outputPath(path).isTransactionalTable(true)
- .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema));
+ .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
writer.write(record);
@@ -151,7 +151,7 @@ public class AvroCarbonWriterTest {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.isTransactionalTable(true)
- .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema));
+ .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
writer.write(record);
@@ -243,7 +243,7 @@ public class AvroCarbonWriterTest {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.isTransactionalTable(true)
- .buildWriterForAvroInput(nn);
+ .buildWriterForAvroInput(nn, TestUtil.configuration);
for (int i = 0; i < 100; i++) {
writer.write(record);
@@ -304,7 +304,7 @@ public class AvroCarbonWriterTest {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.isTransactionalTable(true)
- .buildWriterForAvroInput(nn);
+ .buildWriterForAvroInput(nn, TestUtil.configuration);
for (int i = 0; i < 100; i++) {
writer.write(record);
@@ -340,7 +340,7 @@ public class AvroCarbonWriterTest {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.isTransactionalTable(true).sortBy(sortColumns)
- .buildWriterForAvroInput(nn);
+ .buildWriterForAvroInput(nn, TestUtil.configuration);
for (int i = 0; i < 100; i++) {
writer.write(record);
@@ -458,7 +458,7 @@ public class AvroCarbonWriterTest {
.uniqueIdentifier(System.currentTimeMillis()).outputPath(path);
try {
- writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field));
+ writer.buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration);
Assert.fail();
} catch (Exception e) {
assert(e.getMessage().contains("Duplicate column name found in table schema"));
@@ -478,7 +478,7 @@ public class AvroCarbonWriterTest {
Map<String, String> loadOptions = new HashMap<String, String>();
loadOptions.put("bad_records_action", "fail");
CarbonWriter carbonWriter =
- writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field));
+ writer.isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForCSVInput(new org.apache.carbondata.sdk.file.Schema(field), TestUtil.configuration);
carbonWriter.write(new String[] { "k", "20-02-2233" });
carbonWriter.close();
Assert.fail();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index e71d061..3d59724 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -120,7 +120,7 @@ public class CSVCarbonWriterTest {
.isTransactionalTable(true)
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
String[] row = new String[]{
@@ -243,7 +243,7 @@ public class CSVCarbonWriterTest {
fields[1] = new Field("age", DataTypes.INT);
try {
carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
- outputPath(path).buildWriterForCSVInput(new Schema(fields));
+ outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
Assert.assertTrue(false);
@@ -263,7 +263,7 @@ public class CSVCarbonWriterTest {
fields[1] = new Field("age", DataTypes.INT);
try {
carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
- outputPath(path).buildWriterForCSVInput(new Schema(fields));
+ outputPath(path).buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
Assert.assertTrue(false);
@@ -289,7 +289,7 @@ public class CSVCarbonWriterTest {
.isTransactionalTable(true).taskNo(5)
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 2; i++) {
String[] row = new String[]{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
index c1d5f88..a7ec6cd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -132,7 +132,7 @@ public class CSVNonTransactionalCarbonWriterTest {
builder = builder.withBlockSize(blockSize);
}
- CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+ CarbonWriter writer = builder.buildWriterForCSVInput(schema, TestUtil.configuration);
for (int i = 0; i < rows; i++) {
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
@@ -183,7 +183,7 @@ public class CSVNonTransactionalCarbonWriterTest {
.taskNo(System.nanoTime())
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
String[] row = new String[]{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 54b3e9e..dedf30e 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -40,10 +40,13 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.CharEncoding;
+import org.apache.hadoop.conf.Configuration;
import org.junit.*;
public class CarbonReaderTest extends TestCase {
+ private Configuration conf = new Configuration(false);
+
@Before
public void cleanFile() {
assert (TestUtil.cleanMdtFile());
@@ -72,7 +75,7 @@ public class CarbonReaderTest extends TestCase {
TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true);
CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
- .projection(new String[]{"name", "age"}).build();
+ .projection(new String[]{"name", "age"}).build(conf);
// expected output after sorting
String[] name = new String[200];
@@ -99,7 +102,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.isTransactionalTable(true)
.projection(new String[]{"name", "age"})
- .build();
+ .build(conf);
i = 0;
while (reader2.hasNext()) {
@@ -134,7 +137,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(true)
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -176,7 +179,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(true)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -213,7 +216,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -249,7 +252,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -292,7 +295,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -335,7 +338,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(orExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -378,7 +381,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -421,7 +424,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -464,7 +467,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -507,7 +510,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -550,7 +553,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age", "doubleField"})
.filter(andExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -582,7 +585,7 @@ public class CarbonReaderTest extends TestCase {
CarbonWriter carbonWriter = null;
try {
carbonWriter = builder.outputPath(path1).isTransactionalTable(false).uniqueIdentifier(12345)
- .buildWriterForCSVInput(schema);
+ .buildWriterForCSVInput(schema, TestUtil.configuration);
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
}
@@ -597,7 +600,7 @@ public class CarbonReaderTest extends TestCase {
CarbonWriter carbonWriter1 = null;
try {
carbonWriter1 = builder1.outputPath(path2).isTransactionalTable(false).uniqueIdentifier(12345)
- .buildWriterForCSVInput(schema1);
+ .buildWriterForCSVInput(schema1, TestUtil.configuration);
} catch (InvalidLoadOptionException e) {
e.printStackTrace();
}
@@ -608,14 +611,14 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader =
CarbonReader.builder(path1, "_temp").
projection(new String[] { "c1", "c3" })
- .isTransactionalTable(false).build();
+ .isTransactionalTable(false).build(conf);
} catch (Exception e){
System.out.println("Success");
}
CarbonReader reader1 =
CarbonReader.builder(path2, "_temp1")
.projection(new String[] { "p1", "p2" })
- .isTransactionalTable(false).build();
+ .isTransactionalTable(false).build(conf);
while (reader1.hasNext()) {
Object[] row1 = (Object[]) reader1.readNextRow();
@@ -643,7 +646,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.projection(new String[]{"name", "name", "age", "name"})
.isTransactionalTable(true)
- .build();
+ .build(conf);
// expected output after sorting
String[] name = new String[100];
@@ -685,13 +688,13 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.isTransactionalTable(true)
- .build();
+ .build(conf);
// Reader 2
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.isTransactionalTable(true)
- .build();
+ .build(conf);
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
@@ -719,7 +722,7 @@ public class CarbonReaderTest extends TestCase {
TestUtil.writeFilesAndVerify(100, new Schema(fields), path, true);
CarbonReader reader = CarbonReader.builder(path, "_temp").isTransactionalTable(true)
- .projection(new String[]{"name", "age"}).build();
+ .projection(new String[]{"name", "age"}).build(conf);
reader.close();
String msg = "CarbonReader not initialise, please create it first.";
@@ -762,7 +765,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path)
.projection(new String[]{"name", "age"})
.isTransactionalTable(true)
- .build();
+ .build(conf);
// expected output after sorting
String[] name = new String[100];
@@ -799,7 +802,7 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader = CarbonReader
.builder(path)
- .build();
+ .build(conf);
// expected output after sorting
String[] name = new String[100];
@@ -910,7 +913,7 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader = CarbonReader.builder(path, "_temp")
.projection(new String[]{"name", "age"})
- .build();
+ .build(conf);
// expected output after sorting
String[] name = new String[100];
@@ -987,7 +990,7 @@ public class CarbonReaderTest extends TestCase {
.persistSchemaFile(true)
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
String[] row = new String[]{
@@ -1045,7 +1048,7 @@ public class CarbonReaderTest extends TestCase {
, "dateField"
, "timeField"
, "decimalField"})
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -1108,7 +1111,7 @@ public class CarbonReaderTest extends TestCase {
.persistSchemaFile(true)
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
String[] row2 = new String[]{
@@ -1161,7 +1164,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.isTransactionalTable(true)
.projection(strings)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -1224,7 +1227,7 @@ public class CarbonReaderTest extends TestCase {
.persistSchemaFile(true)
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
String[] row2 = new String[]{
@@ -1275,7 +1278,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.projection(strings)
.isTransactionalTable(true)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -1338,7 +1341,7 @@ public class CarbonReaderTest extends TestCase {
.persistSchemaFile(true)
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), TestUtil.configuration);
for (int i = 0; i < 100; i++) {
String[] row2 = new String[]{
@@ -1381,7 +1384,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.isTransactionalTable(true)
.projection(strings)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
@@ -1423,7 +1426,7 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.isTransactionalTable(true)
- .build();
+ .build(conf);
// expected output after sorting
String[] name = new String[100];
@@ -1461,7 +1464,7 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.isTransactionalTable(true)
- .build();
+ .build(conf);
// expected output after sorting
String[] name = new String[100];
@@ -1498,7 +1501,7 @@ public class CarbonReaderTest extends TestCase {
.builder(path, "_temp")
.projection(new String[]{})
.isTransactionalTable(true)
- .build();
+ .build(conf);
assert (false);
} catch (RuntimeException e) {
assert (e.getMessage().equalsIgnoreCase("Projection can't be empty"));
@@ -1517,7 +1520,7 @@ public class CarbonReaderTest extends TestCase {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
.isTransactionalTable(isTransactionalTable)
- .buildWriterForAvroInput(nn);
+ .buildWriterForAvroInput(nn, TestUtil.configuration);
for (int i = 0; i < 100; i++) {
writer.write(record);
@@ -1645,7 +1648,7 @@ public class CarbonReaderTest extends TestCase {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.isTransactionalTable(false)
- .build();
+ .build(conf);
// expected output
String name = "bob";
@@ -1688,7 +1691,7 @@ public class CarbonReaderTest extends TestCase {
.isTransactionalTable(false)
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
- .build();
+ .build(conf);
int i = 0;
while (reader.hasNext()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
index 9a6e8e0..4cb816b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentAvroSdkWriterTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericData;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
@@ -59,7 +60,8 @@ public class ConcurrentAvroSdkWriterTest {
ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
try {
CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
- CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads);
+ CarbonWriter writer = builder.buildThreadSafeWriterForAvroInput(avroSchema, numOfThreads,
+ TestUtil.configuration);
// write in multi-thread
for (int i = 0; i < numOfThreads; i++) {
executorService.submit(new WriteLogic(writer, record));
@@ -76,7 +78,7 @@ public class ConcurrentAvroSdkWriterTest {
CarbonReader reader;
try {
reader =
- CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build();
+ CarbonReader.builder(path, "_temp").projection(new String[] { "name", "age" }).build(new Configuration(false));
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
index 8ce1ef1..9bb3f29 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ConcurrentSdkWriterTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
@@ -53,7 +54,7 @@ public class ConcurrentSdkWriterTest {
CarbonWriterBuilder builder = CarbonWriter.builder()
.outputPath(path);
CarbonWriter writer =
- builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads);
+ builder.buildThreadSafeWriterForCSVInput(new Schema(fields), numOfThreads, TestUtil.configuration);
// write in multi-thread
for (int i = 0; i < numOfThreads; i++) {
executorService.submit(new WriteLogic(writer));
@@ -72,7 +73,7 @@ public class ConcurrentSdkWriterTest {
reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
- .build();
+ .build(new Configuration(false));
int i = 0;
while (reader.hasNext()) {
Object[] row = (Object[]) reader.readNextRow();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 83026a2..2d5dbcd 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -38,10 +38,13 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonDecoder;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
public class TestUtil {
+ public static Configuration configuration = new Configuration();
+
public static GenericData.Record jsonToAvro(String json, String avroSchema) throws IOException {
InputStream input = null;
DataFileWriter writer = null;
@@ -130,7 +133,7 @@ public class TestUtil {
builder = builder.withBlockSize(blockSize);
}
- CarbonWriter writer = builder.buildWriterForCSVInput(schema);
+ CarbonWriter writer = builder.buildWriterForCSVInput(schema, configuration);
for (int i = 0; i < rows; i++) {
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 30aa415..7a764dd 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -176,7 +177,7 @@ public class SearchRequestHandler {
if (uniqueSegments.get(segmentId) == null) {
segments.add(Segment.toSegment(segmentId,
new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
- loadMetadataDetails)));
+ loadMetadataDetails, FileFactory.getConfiguration())));
uniqueSegments.put(segmentId, 1);
} else {
uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);
[2/3] carbondata git commit: [CARBONDATA-2909] Multi user support for
SDK on S3
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index ff2ffdd..809d68b 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.features.TableOperation;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -53,8 +54,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
try {
dataMap.init(new DataMapModel(
- DataMapWriter.getDefaultDataMapPath(
- tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName)));
+ DataMapWriter.getDefaultDataMapPath(tableIdentifier.getTablePath(),
+ segment.getSegmentNo(), dataMapName), segment.getConfiguration()));
} catch (MemoryException e) {
LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage());
return lstDataMap;
@@ -73,7 +74,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer, getDataMapSchema());
String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath();
try {
- dataMap.init(new DataMapModel(indexPath));
+ dataMap.init(new DataMapModel(indexPath, FileFactory.getConfiguration()));
} catch (MemoryException e) {
LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage()));
return lstDataMap;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index c80cc75..9c1e18d 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -383,11 +383,12 @@ public CarbonWriterBuilder withTableProperties(Map<String, String> options);
* this writer is not thread safe, use buildThreadSafeWriterForCSVInput in multi thread environment
* Build a {@link CarbonWriter}, which accepts row in CSV format object
* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
+* @param configuration hadoop configuration object.
* @return CSVCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
```
```
@@ -395,12 +396,13 @@ public CarbonWriter buildWriterForCSVInput(org.apache.carbondata.sdk.file.Schema
* Can use this writer in multi-thread instance.
* Build a {@link CarbonWriter}, which accepts row in CSV format
* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
-* @param numOfThreads number of threads() in which .write will be called.
+* @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuration object
* @return CSVCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfThreads, Configuration configuration)
throws IOException, InvalidLoadOptionException;
```
@@ -410,11 +412,12 @@ public CarbonWriter buildThreadSafeWriterForCSVInput(Schema schema, short numOfT
* this writer is not thread safe, use buildThreadSafeWriterForAvroInput in multi thread environment
* Build a {@link CarbonWriter}, which accepts Avro format object
* @param avroSchema avro Schema object {org.apache.avro.Schema}
+* @param configuration hadoop configuration object
* @return AvroCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throws IOException, InvalidLoadOptionException;
+public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema, Configuration configuration) throws IOException, InvalidLoadOptionException;
```
```
@@ -423,11 +426,13 @@ public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema schema) throw
* Build a {@link CarbonWriter}, which accepts Avro object
* @param avroSchema avro Schema object {org.apache.avro.Schema}
* @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuration object
* @return AvroCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short numOfThreads)
+public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avroSchema, short
+numOfThreads, Configuration configuration)
throws IOException, InvalidLoadOptionException
```
@@ -437,11 +442,12 @@ public CarbonWriter buildThreadSafeWriterForAvroInput(org.apache.avro.Schema avr
* this writer is not thread safe, use buildThreadSafeWriterForJsonInput in multi thread environment
* Build a {@link CarbonWriter}, which accepts Json object
* @param carbonSchema carbon Schema object
+* @param configuration hadoop configuration object
* @return JsonCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
+public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema, Configuration configuration);
```
```
@@ -450,11 +456,12 @@ public JsonCarbonWriter buildWriterForJsonInput(Schema carbonSchema);
* Build a {@link CarbonWriter}, which accepts Json object
* @param carbonSchema carbon Schema object
* @param numOfThreads number of threads() in which .write will be called.
+* @param configuration hadoop configuraiton object.
* @return JsonCarbonWriter
* @throws IOException
* @throws InvalidLoadOptionException
*/
-public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads)
+public JsonCarbonWriter buildThreadSafeWriterForJsonInput(Schema carbonSchema, short numOfThreads, Configuration configuration)
```
### Class org.apache.carbondata.sdk.file.CarbonWriter
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index ada1a8c..4eec4bf 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -23,6 +23,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.sdk.file.CarbonReader;
@@ -55,7 +56,7 @@ public class CarbonReaderExample {
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
- .buildWriterForCSVInput(new Schema(fields));
+ .buildWriterForCSVInput(new Schema(fields), new Configuration(false));
for (int i = 0; i < 10; i++) {
String[] row2 = new String[]{
@@ -98,7 +99,7 @@ public class CarbonReaderExample {
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(strings)
- .build();
+ .build(new Configuration(false));
System.out.println("\nData:");
long day = 24L * 3600 * 1000;
@@ -116,7 +117,7 @@ public class CarbonReaderExample {
// Read data
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
- .build();
+ .build(new Configuration(false));
System.out.println("\nData:");
i = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 52d51b5..3abc342 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.sdk.file.*;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Example for testing CarbonWriter on S3
*/
@@ -56,7 +58,7 @@ public class SDKS3Example {
.setEndPoint(args[2])
.outputPath(path);
- CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
+ CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields), new Configuration(false));
for (int i = 0; i < num; i++) {
writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
@@ -75,7 +77,7 @@ public class SDKS3Example {
.setAccessKey(args[0])
.setSecretKey(args[1])
.setEndPoint(args[2])
- .build();
+ .build(new Configuration(false));
System.out.println("\nData:");
int i = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
index a011d80..86bf854 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DirectSQLExample.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.examples
import java.io.File
import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.examples.util.ExampleUtils
@@ -36,7 +37,7 @@ object DirectSQLExample {
def buildTestData(
path: String,
num: Int = 3,
- persistSchema: Boolean = false): Any = {
+ persistSchema: Boolean = false, sparkSession: SparkSession): Any = {
// getCanonicalPath gives path with \, but the code expects /.
val writerPath = path.replace("\\", "/");
@@ -56,7 +57,8 @@ object DirectSQLExample {
if (persistSchema) {
builder.persistSchemaFile(true)
}
- val writer = builder.buildWriterForCSVInput(new Schema(fields))
+ val writer = builder
+ .buildWriterForCSVInput(new Schema(fields), sparkSession.sparkContext.hadoopConfiguration)
var i = 0
while (i < num) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -82,7 +84,7 @@ object DirectSQLExample {
import carbonSession._
// 1. generate data file
cleanTestData(path)
- buildTestData(path, 20)
+ buildTestData(path, 20, sparkSession = carbonSession)
val readPath = path + "Fact/Part0/Segment_null"
println("Running SQL on carbon files directly")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 1795960..c5c9710 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.examples
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
@@ -52,12 +53,12 @@ object S3UsingSDKExample {
builder.outputPath(writerPath).isTransactionalTable(true)
.uniqueIdentifier(
System.currentTimeMillis)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
} else {
builder.outputPath(writerPath).isTransactionalTable(true)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), new Configuration(false))
}
var i = 0
var row = num
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9e5edc1..fcfb346 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -113,11 +113,14 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
ReadCommittedScope readCommittedScope = null;
if (carbonTable.isTransactionalTable()) {
readCommittedScope = new LatestFilesReadCommittedScope(
- identifier.getTablePath() + "/Fact/Part0/Segment_null/");
+ identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration());
} else {
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
- readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+ readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath(), job
+ .getConfiguration());
+ } else {
+ readCommittedScope.setConfiguration(job.getConfiguration());
}
}
// this will be null in case of corrupt schema file.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index e5e3165..eb9ff7c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -188,8 +188,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
Set<Segment> segmentSet = new HashSet<>(
- new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
- .getValidAndInvalidSegments().getValidSegments());
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+ context.getConfiguration()).getValidAndInvalidSegments().getValidSegments());
if (updateTime != null) {
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
segmentDeleteList);
@@ -223,8 +223,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
if (partitionSpecs != null && partitionSpecs.size() > 0) {
List<Segment> validSegments =
- new SegmentStatusManager(table.getAbsoluteTableIdentifier()).getValidAndInvalidSegments()
- .getValidSegments();
+ new SegmentStatusManager(table.getAbsoluteTableIdentifier())
+ .getValidAndInvalidSegments().getValidSegments();
String uniqueId = String.valueOf(System.currentTimeMillis());
List<String> tobeUpdatedSegs = new ArrayList<>();
List<String> tobeDeletedSegs = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ba6e043..ba3accf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -143,7 +143,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
List<UpdateVO> invalidTimestampsList = new ArrayList<>();
List<Segment> streamSegments = null;
// get all valid segments and set them into the configuration
- SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier,
+ readCommittedScope.getConfiguration());
SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
.getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
@@ -583,7 +584,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
table, loadMetadataDetails);
SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
- new SegmentStatusManager(identifier)
+ new SegmentStatusManager(identifier, readCommittedScope.getConfiguration())
.getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
Map<String, Long> blockRowCountMapping = new HashMap<>();
Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
@@ -649,11 +650,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
if (readCommittedScope == null) {
ReadCommittedScope readCommittedScope;
if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
- readCommittedScope = new TableStatusReadCommittedScope(identifier);
+ readCommittedScope = new TableStatusReadCommittedScope(identifier, job.getConfiguration());
} else {
readCommittedScope = getReadCommittedScope(job.getConfiguration());
if (readCommittedScope == null) {
- readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+ readCommittedScope =
+ new LatestFilesReadCommittedScope(identifier.getTablePath(), job.getConfiguration());
}
}
this.readCommittedScope = readCommittedScope;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 99d8532..2d4f370 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -270,6 +270,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
throw new RuntimeException(e);
+ } finally {
+ ThreadLocalSessionInfo.unsetAll();
}
}
});
@@ -444,6 +446,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
} finally {
executorService.shutdownNow();
dataLoadExecutor.close();
+ ThreadLocalSessionInfo.unsetAll();
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
index de0d731..40a0a62 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala
@@ -112,7 +112,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
} else {
if (options != null) {
builder.outputPath(writerPath)
@@ -120,14 +120,15 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
} else {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext
+ .hadoopConfiguration)
}
}
var i = 0
@@ -544,7 +545,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
try {
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
writer.write(record)
@@ -743,7 +744,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), sqlContext.sparkContext.hadoopConfiguration)
for (i <- 0 until 5) {
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(33000), i.toString))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index a1d4290..63fb2e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -21,6 +21,7 @@ import java.util
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.indexstore.Blocklet
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.scan.expression.{ColumnExpression, LiteralExpression}
@@ -304,7 +306,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
val resolveFilter: FilterResolverIntf =
CarbonTable.resolveFilter(andExpression, carbonTable.getAbsoluteTableIdentifier)
val exprWrapper = DataMapChooser.getDefaultDataMap(carbonTable, resolveFilter)
- val segment = new Segment("0")
+ val segment = new Segment("0", new TableStatusReadCommittedScope(carbonTable
+ .getAbsoluteTableIdentifier, new Configuration(false)))
// get the pruned blocklets
val prunedBlocklets = exprWrapper.prune(List(segment).asJava, null)
prunedBlocklets.asScala.foreach { blocklet =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 08daa34..1b181bc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -59,9 +59,11 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
}
var i = 0
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 0b6813f..a03a5eb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -142,7 +142,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
if (options != null) {
builder.outputPath(writerPath)
@@ -150,14 +151,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
System.currentTimeMillis).withBlockSize(2)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
}
}
var i = 0
@@ -194,7 +197,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields),
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
@@ -228,7 +232,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.sortBy(sortColumns.toArray)
.uniqueIdentifier(
123).withBlockSize(2)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema),
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -992,7 +997,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val builder: CarbonWriterBuilder = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options)
- val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+ val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"));
writer.close()
@@ -1117,7 +1123,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
try {
val writer = CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
writer.write(record)
@@ -2091,7 +2098,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
assert(intercept[RuntimeException] {
val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}.getMessage.toLowerCase.contains("column: name specified in sort columns"))
@@ -2131,7 +2139,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -2169,7 +2178,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder.sortBy(Array("id"))
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -2213,7 +2223,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -2253,7 +2264,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -2299,7 +2311,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -2346,7 +2359,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -2366,7 +2380,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
val writer: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
.withTableProperties(options)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
writer.write(Array("carbon", "1"))
writer.write(Array("hydrogen", "10"))
writer.write(Array("boron", "4"))
@@ -2384,7 +2398,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
// write local sort data
val writer1: CarbonWriter = CarbonWriter.builder
.outputPath(writerPath)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), sqlContext.sparkContext.hadoopConfiguration)
writer1.write(Array("carbon", "1"))
writer1.write(Array("hydrogen", "10"))
writer1.write(Array("boron", "4"))
@@ -2493,7 +2507,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
fields(0) = new Field("name", DataTypes.STRING)
fields(1) = new Field("surname", DataTypes.STRING)
fields(2) = new Field("age", DataTypes.INT)
- val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
+ val carbonWriter = builder.buildWriterForCSVInput(new Schema(fields),
+ sqlContext.sparkContext.hadoopConfiguration)
var i = 0
while (i < 100) {
{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
index ff5c062..17aae1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableJsonWriter.scala
@@ -98,7 +98,8 @@ class TestNonTransactionalCarbonTableJsonWriter extends QueryTest with BeforeAnd
.outputPath(writerPath).isTransactionalTable(false)
.uniqueIdentifier(System.currentTimeMillis())
.withLoadOptions(options)
- .buildWriterForJsonInput(carbonSchema)
+ .buildWriterForJsonInput(carbonSchema,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(jsonRow)
writer.close()
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index dc13b16..e7fcf95 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -93,7 +93,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -140,7 +140,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, mySchema)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -184,7 +184,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json, mySchema)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -215,7 +215,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -245,7 +245,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -275,7 +275,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -305,7 +305,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -335,7 +335,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -365,7 +365,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val exception1 = intercept[UnsupportedOperationException] {
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
}
@@ -402,7 +402,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -438,7 +438,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -476,7 +476,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -509,7 +509,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -568,7 +568,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -663,7 +663,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -716,7 +716,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -786,7 +786,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
avroRec.put("struct_field_decimal", genericByteArray)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -858,7 +858,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
avroRec.put("dec_fields", genericByteArray)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -905,7 +905,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false)
+ .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -952,7 +953,8 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false)
+ .buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -997,7 +999,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("id", bytes1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1042,7 +1044,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("dec_field", bytes1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1087,7 +1089,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("dec_field", bytes1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1128,7 +1130,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val avroRec = new GenericData. Record(nn)
avroRec.put("dec_field", bytes)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(
@@ -1170,7 +1172,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
avroRec.put("dec_field", bytes)
val exception1 = intercept[Exception] {
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
}
@@ -1220,7 +1222,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -1256,7 +1258,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val record = testUtil.jsonToAvro(json1, schema1)
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(record)
writer.close()
sql(
@@ -1306,7 +1308,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
val writer = CarbonWriter.builder
- .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn, sqlContext.sparkContext.hadoopConfiguration)
writer.write(avroRec)
writer.close()
sql(s"create table sdkOutputTable(union_field struct<union_field0:decimal(10,2),union_field1:int>) " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 62ba03e..0421ea8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -68,11 +68,13 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false).enableLocalDictionary(true)
.localDictionaryThreshold(2000)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
} else {
CarbonWriter.builder
.outputPath(writerPath).isTransactionalTable(false)
- .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
+ .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn,
+ sqlContext.sparkContext.hadoopConfiguration)
}
var i = 0
while (i < rows) {
@@ -268,7 +270,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
""".stripMargin
val pschema= org.apache.avro.Schema.parse(mySchema)
val records = testUtil.jsonToAvro(jsonvalue, mySchema)
- val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
+ val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema,
+ sqlContext.sparkContext.hadoopConfiguration)
writer.write(records)
writer.close()
sql("DROP TABLE IF EXISTS sdkOutputTable")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 4e2197d..a8bdb31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.testsuite.createTable
import java.io.File
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -58,9 +59,11 @@ object TestSparkCarbonFileFormatWithSparkSession {
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+ Configuration(false))
} else {
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), new
+ Configuration(false))
}
var i = 0
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index 39785a3..6e8e79b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -18,9 +18,10 @@
package org.apache.carbondata.spark.testsuite.dataload
import scala.collection.JavaConverters._
-
import java.io.{File, FilenameFilter}
+import org.apache.hadoop.conf.Configuration
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.reader.CarbonIndexFileReader
import org.apache.carbondata.core.util.CarbonProperties
@@ -64,7 +65,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
} else {
val segment = Segment.getSegment("0", carbonTable.getTablePath)
val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
}
for (carbonIndexPath <- carbonIndexPaths) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0c42264..44bc243 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import java.io.{File, FileWriter}
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
@@ -281,7 +282,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
} else {
val segment = Segment.getSegment("0", carbonTable.getTablePath)
val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
assertResult(Math.max(4, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 046a2a6..a4bc6f0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -22,12 +22,13 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -71,7 +72,7 @@ class CGDataMapFactory(
val files = file.listFiles()
files.map {f =>
val dataMap: CoarseGrainDataMap = new CGDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
dataMap
}.toList.asJava
}
@@ -83,7 +84,8 @@ class CGDataMapFactory(
override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
val dataMap: CoarseGrainDataMap = new CGDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath, new
+ Configuration(false)))
Seq(dataMap).asJava
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index b13582b..57b3672 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -22,12 +22,14 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
+import org.apache.hadoop.conf.Configuration
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapModel, DataMapWriter}
import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
import org.apache.carbondata.core.datastore.FileReader
import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -68,7 +70,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
val files = file.listFiles()
files.map { f =>
val dataMap: FineGrainDataMap = new FGDataMap()
- dataMap.init(new DataMapModel(f.getCanonicalPath))
+ dataMap.init(new DataMapModel(f.getCanonicalPath, new Configuration(false)))
dataMap
}.toList.asJava
}
@@ -79,7 +81,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
val dataMap: FineGrainDataMap = new FGDataMap()
- dataMap.init(new DataMapModel(mapDistributable.getFilePath))
+ dataMap.init(new DataMapModel(mapDistributable.getFilePath, new Configuration(false)))
Seq(dataMap).asJava
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 8ebed1f..edd3e9c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -18,9 +18,9 @@
package org.apache.carbondata.spark.testsuite.datamap
import scala.collection.JavaConverters._
-
import java.io.{File, FilenameFilter}
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -337,7 +337,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
} else {
val segment = Segment.getSegment("0", path)
val store = new SegmentFileStore(path, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
assertResult(true)(size > 0)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 133454a..f4c725e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.partition
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
@@ -78,7 +79,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
} else {
val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index e3e8e68..9a0080c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -21,6 +21,7 @@ import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors}
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.optimizer.CarbonFilters
@@ -352,7 +353,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
val details = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile)
- store.readIndexFiles()
+ store.readIndexFiles(new Configuration(false))
store.getIndexFiles
assert(store.getIndexFiles.size() == 10)
CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
index af05613..3a650ec 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -117,7 +117,7 @@ class CarbonFileIndex(
}
CarbonInputFormat.setReadCommittedScope(
hadoopConf,
- new LatestFilesReadCommittedScope(indexFiles))
+ new LatestFilesReadCommittedScope(indexFiles, hadoopConf))
filter match {
case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
case None => None
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index a5e1b39..62d9903 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -78,9 +78,10 @@ class SparkCarbonFileFormat extends FileFormat
override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
+ val conf = sparkSession.sessionState.newHadoopConf()
val tablePath = options.get("path") match {
case Some(path) =>
- FileFactory.checkAndAppendDefaultFs(path, sparkSession.sparkContext.hadoopConfiguration)
+ FileFactory.checkAndAppendDefaultFs(path, conf)
case _ if files.nonEmpty =>
FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
case _ =>
@@ -89,7 +90,8 @@ class SparkCarbonFileFormat extends FileFormat
if (options.get(CarbonCommonConstants.SORT_COLUMNS).isDefined) {
throw new UnsupportedOperationException("Cannot use sort columns during infer schema")
}
- val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+ val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""),
+ false, conf)
val table = CarbonTable.buildFromTableInfo(tableInfo)
var schema = new StructType
val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col =>
@@ -395,7 +397,7 @@ class SparkCarbonFileFormat extends FileFormat
vectorizedReader
} else {
val reader = new CarbonRecordReader(model,
- new SparkUnsafeRowReadSuport(requiredSchema), null)
+ new SparkUnsafeRowReadSuport(requiredSchema), broadcastedHadoopConf.value.value)
reader.initialize(split, hadoopAttemptContext)
reader
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 66c0224..825cdec 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -868,7 +868,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
builder.outputPath(writerPath)
.isTransactionalTable(false)
.uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
- .buildWriterForCSVInput(new Schema(fields))
+ .buildWriterForCSVInput(new Schema(fields), spark.sparkContext.hadoopConfiguration)
var i = 0
while (i < rows) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
index 26f67f8..43f04b8 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -22,8 +22,8 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.util.SparkUtil
+import org.apache.spark.sql.carbondata.datasource.TestUtil.{spark, _}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
@@ -70,9 +70,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
val writer =
if (persistSchema) {
builder.persistSchemaFile(true)
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+ .sparkContext.hadoopConfiguration)
} else {
- builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
+ builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema), spark
+ .sparkContext.hadoopConfiguration)
}
var i = 0
@@ -333,7 +335,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
.toString()
val builder = CarbonWriter.builder()
val writer = builder.outputPath(writerPath)
- .buildWriterForCSVInput(Schema.parseJson(schema))
+ .buildWriterForCSVInput(Schema.parseJson(schema), spark.sessionState.newHadoopConf())
for (i <- 0 until 3) {
// write a varchar with 75,000 length
writer.write(Array[String](s"name_$i", RandomStringUtils.randomAlphabetic(75000), i.toString))
@@ -348,15 +350,12 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
s"""CREATE TABLE sdkOutputTable (name string, address string, age int)
|USING carbon OPTIONS (PATH '$writerPath', "long_String_columns" "address") """
.stripMargin)
- } else if (spark.sparkContext.version.startsWith("2.2")) {
+ } else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTable (name string, address string, age int) USING carbon
|OPTIONS("long_String_columns"="address") LOCATION
|'$writerPath' """.stripMargin)
- } else {
- // TODO. spark2.3 ?
- assert(false)
}
assert(spark.sql("select * from sdkOutputTable where age = 0").count() == 1)
val op = spark.sql("select address from sdkOutputTable limit 1").collectAsList()
@@ -371,14 +370,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends FunSuite with BeforeAndA
.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS (PATH
|'$writerPath', "long_String_columns" "address") """.stripMargin)
- } else if (spark.sparkContext.version.startsWith("2.2")) {
+ } else {
//data source file format
spark.sql(
s"""CREATE TABLE sdkOutputTableWithoutSchema USING carbon OPTIONS
|("long_String_columns"="address") LOCATION '$writerPath' """.stripMargin)
- } else {
- // TODO. spark2.3 ?
- assert(false)
}
assert(spark.sql("select * from sdkOutputTableWithoutSchema where age = 0").count() == 1)
val op1 = spark.sql("select address from sdkOutputTableWithoutSchema limit 1").collectAsList()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0fd4e34..57887a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -436,7 +436,7 @@ object CarbonDataRDDFactory {
res.foreach { resultOfSeg =>
resultSize = resultSize + resultOfSeg.size
resultOfSeg.foreach { resultOfBlock =>
- segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
}
}
val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index b77632d..4921b33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -159,14 +159,13 @@ object DeleteExecution {
resultOfBlock => {
if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
blockUpdateDetailsList.add(resultOfBlock._2._1)
- segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null))
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName))
// if this block is invalid then decrement block count in map.
if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
blockMappingVO.getSegmentNumberOfBlockMapping)
}
- }
- else {
+ } else {
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index e3da86d..2951283 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1202,7 +1202,7 @@ public final class CarbonDataMergerUtil {
segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
CarbonFile[] deleteDeltaFiles =
- segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName);
+ segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg), blockName);
String destFileName =
blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 19353d1..f6cc485 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -326,7 +326,7 @@ public final class CarbonLoaderUtil {
for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
// if the segments is in the list of marked for delete then update the status.
- if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
+ if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName()))) {
detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
} else if (segmentFilesTobeUpdated
.contains(Segment.toSegment(detail.getLoadName(), null))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index dd70cc9..a183197 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -74,8 +74,7 @@ public class AvroCarbonWriter extends CarbonWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonTable.class.getName());
- AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
- Configuration hadoopConf = new Configuration();
+ AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
CarbonTableOutputFormat format = new CarbonTableOutputFormat();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
index 627e060..a8899a7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java
@@ -46,8 +46,7 @@ class CSVCarbonWriter extends CarbonWriter {
private TaskAttemptContext context;
private ObjectArrayWritable writable;
- CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException {
- Configuration hadoopConf = new Configuration();
+ CSVCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf) throws IOException {
CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
CarbonTableOutputFormat format = new CarbonTableOutputFormat();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
[3/3] carbondata git commit: [CARBONDATA-2909] Multi user support for
SDK on S3
Posted by ra...@apache.org.
[CARBONDATA-2909] Multi user support for SDK on S3
Added support for multiple users with different SK/AK to write concurrently to S3.
Make it mandatory for user to give Hadoop configuration while creating SDK writer/reader.
Passed hadoop configuration to core layer so that FileFactory can access it.
Fixed various SK/AK not found exceptions in CarbonSparkFileFormat.
This closes #2678
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8f1a029b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8f1a029b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8f1a029b
Branch: refs/heads/master
Commit: 8f1a029b9ad82cb3d1972e63be2055c84895661b
Parents: 7c827c0
Author: kunal642 <ku...@gmail.com>
Authored: Fri Aug 31 11:12:30 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Sep 11 17:46:05 2018 +0530
----------------------------------------------------------------------
.../carbondata/core/datamap/DataMapUtil.java | 10 ++-
.../apache/carbondata/core/datamap/Segment.java | 11 +++
.../carbondata/core/datamap/dev/DataMap.java | 3 +-
.../core/datamap/dev/DataMapModel.java | 12 ++-
.../filesystem/AbstractDFSCarbonFile.java | 18 +++--
.../core/datastore/impl/FileFactory.java | 5 ++
.../indexstore/BlockletDataMapIndexStore.java | 24 ++++--
.../indexstore/BlockletDataMapIndexWrapper.java | 14 +++-
.../TableBlockIndexUniqueIdentifierWrapper.java | 19 +++++
.../indexstore/blockletindex/BlockDataMap.java | 6 +-
.../blockletindex/BlockletDataMapFactory.java | 2 +-
.../blockletindex/BlockletDataMapModel.java | 12 +--
.../blockletindex/SegmentIndexFileStore.java | 37 ++++++---
.../core/metadata/SegmentFileStore.java | 22 +++---
.../core/metadata/schema/SchemaReader.java | 19 ++++-
.../core/mutate/CarbonUpdateUtil.java | 6 +-
.../LatestFilesReadCommittedScope.java | 25 +++++--
.../core/readcommitter/ReadCommittedScope.java | 6 ++
.../TableStatusReadCommittedScope.java | 19 ++++-
.../core/reader/CarbonHeaderReader.java | 13 +++-
.../core/reader/CarbonIndexFileReader.java | 12 ++-
.../carbondata/core/reader/ThriftReader.java | 20 ++++-
.../statusmanager/SegmentStatusManager.java | 23 +++---
.../util/AbstractDataFileFooterConverter.java | 15 +++-
.../core/util/BlockletDataMapUtil.java | 12 +--
.../apache/carbondata/core/util/CarbonUtil.java | 47 +++---------
.../core/util/DataFileFooterConverter.java | 10 +++
.../core/util/DataFileFooterConverter2.java | 13 +++-
.../core/util/DataFileFooterConverterV3.java | 11 +++
.../core/writer/CarbonIndexFileMergeWriter.java | 4 +-
.../TestBlockletDataMapFactory.java | 8 +-
.../carbondata/core/util/CarbonUtilTest.java | 4 +-
.../core/util/DataFileFooterConverterTest.java | 3 +-
.../bloom/BloomCoarseGrainDataMapFactory.java | 5 +-
.../datamap/bloom/BloomDataMapModel.java | 7 +-
.../lucene/LuceneFineGrainDataMapFactory.java | 7 +-
docs/sdk-guide.md | 21 ++++--
.../examples/sdk/CarbonReaderExample.java | 7 +-
.../carbondata/examples/sdk/SDKS3Example.java | 6 +-
.../carbondata/examples/DirectSQLExample.scala | 8 +-
.../carbondata/examples/S3UsingSDkExample.scala | 5 +-
.../hadoop/api/CarbonFileInputFormat.java | 7 +-
.../hadoop/api/CarbonOutputCommitter.java | 8 +-
.../hadoop/api/CarbonTableInputFormat.java | 10 ++-
.../hadoop/api/CarbonTableOutputFormat.java | 3 +
.../sdv/generated/SDKwriterTestCase.scala | 11 +--
...ithColumnMetCacheAndCacheLevelProperty.scala | 5 +-
...FileInputFormatWithExternalCarbonTable.scala | 6 +-
.../TestNonTransactionalCarbonTable.scala | 49 +++++++-----
...tNonTransactionalCarbonTableJsonWriter.scala | 3 +-
...ansactionalCarbonTableWithAvroDataType.scala | 58 +++++++-------
...ransactionalCarbonTableWithComplexType.scala | 9 ++-
...tSparkCarbonFileFormatWithSparkSession.scala | 7 +-
.../dataload/TestDataLoadWithFileName.scala | 5 +-
.../dataload/TestGlobalSortDataLoad.scala | 3 +-
.../testsuite/datamap/CGDataMapTestCase.scala | 8 +-
.../testsuite/datamap/FGDataMapTestCase.scala | 8 +-
.../testsuite/datamap/TestDataMapCommand.scala | 4 +-
.../TestDataLoadingForPartitionTable.scala | 3 +-
.../StandardPartitionTableLoadingTestCase.scala | 3 +-
.../execution/datasources/CarbonFileIndex.scala | 2 +-
.../datasources/SparkCarbonFileFormat.scala | 8 +-
.../datasource/SparkCarbonDataSourceTest.scala | 2 +-
...tCreateTableUsingSparkCarbonFileFormat.scala | 20 ++---
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../command/mutation/DeleteExecution.scala | 5 +-
.../processing/merger/CarbonDataMergerUtil.java | 2 +-
.../processing/util/CarbonLoaderUtil.java | 2 +-
.../carbondata/sdk/file/AvroCarbonWriter.java | 3 +-
.../carbondata/sdk/file/CSVCarbonWriter.java | 3 +-
.../sdk/file/CarbonReaderBuilder.java | 5 +-
.../sdk/file/CarbonWriterBuilder.java | 38 ++++------
.../carbondata/sdk/file/JsonCarbonWriter.java | 8 +-
.../sdk/file/AvroCarbonWriterTest.java | 14 ++--
.../sdk/file/CSVCarbonWriterTest.java | 8 +-
.../CSVNonTransactionalCarbonWriterTest.java | 4 +-
.../carbondata/sdk/file/CarbonReaderTest.java | 79 ++++++++++----------
.../sdk/file/ConcurrentAvroSdkWriterTest.java | 6 +-
.../sdk/file/ConcurrentSdkWriterTest.java | 5 +-
.../apache/carbondata/sdk/file/TestUtil.java | 5 +-
.../store/worker/SearchRequestHandler.java | 3 +-
81 files changed, 616 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index e015052..60c5233 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -99,7 +100,7 @@ public class DataMapUtil {
}
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
- getValidAndInvalidSegments(carbonTable);
+ getValidAndInvalidSegments(carbonTable, FileFactory.getConfiguration());
List<Segment> validSegments = validAndInvalidSegmentsInfo.getValidSegments();
List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
DataMapExprWrapper dataMapExprWrapper = null;
@@ -140,7 +141,7 @@ public class DataMapUtil {
List<PartitionSpec> partitionsToPrune) throws IOException {
String className = "org.apache.carbondata.core.datamap.DistributableDataMapFormat";
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegmentsInfo =
- getValidAndInvalidSegments(carbonTable);
+ getValidAndInvalidSegments(carbonTable, validSegments.get(0).getConfiguration());
List<Segment> invalidSegments = validAndInvalidSegmentsInfo.getInvalidSegments();
DistributableDataMapFormat dataMapFormat =
createDataMapJob(carbonTable, dataMapExprWrapper, validSegments, invalidSegments,
@@ -152,8 +153,9 @@ public class DataMapUtil {
}
private static SegmentStatusManager.ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
- CarbonTable carbonTable) throws IOException {
- SegmentStatusManager ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier());
+ CarbonTable carbonTable, Configuration configuration) throws IOException {
+ SegmentStatusManager ssm =
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), configuration);
return ssm.getValidAndInvalidSegments();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 30e811a..85445eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -32,6 +32,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Represents one load of carbondata
*/
@@ -64,6 +66,11 @@ public class Segment implements Serializable {
this.segmentNo = segmentNo;
}
+ public Segment(String segmentNo, ReadCommittedScope readCommittedScope) {
+ this.segmentNo = segmentNo;
+ this.readCommittedScope = readCommittedScope;
+ }
+
/**
* ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
* a NullPointerException. In case getCommittedIndexFile is need to be accessed then
@@ -202,6 +209,10 @@ public class Segment implements Serializable {
return null;
}
+ public Configuration getConfiguration() {
+ return readCommittedScope.getConfiguration();
+ }
+
public Set<String> getFilteredIndexShardNames() {
return filteredIndexShardNames;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 456776b..47eeafe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -37,7 +37,8 @@ public interface DataMap<T extends Blocklet> {
/**
* It is called to load the data map to memory or to initialize it.
*/
- void init(DataMapModel dataMapModel) throws MemoryException, IOException;
+ void init(DataMapModel dataMapModel)
+ throws MemoryException, IOException;
/**
* Prune the datamap with resolved filter expression and partition information.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
index 76bbeee..5f4d1dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapModel.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datamap.dev;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Information required to build datamap
*/
@@ -24,11 +26,19 @@ public class DataMapModel {
private String filePath;
- public DataMapModel(String filePath) {
+ private Configuration configuration;
+
+ public DataMapModel(String filePath, Configuration configuration) {
this.filePath = filePath;
+ this.configuration = configuration;
}
public String getFilePath() {
return filePath;
}
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 5128022..41215a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -282,7 +282,12 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
@Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
int bufferSize, Configuration hadoopConf) throws IOException {
return getDataInputStream(path, fileType, bufferSize,
- CarbonUtil.inferCompressorFromFileName(path));
+ CarbonUtil.inferCompressorFromFileName(path), hadoopConf);
+ }
+
+ @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+ int bufferSize, String compressor) throws IOException {
+ return getDataInputStream(path, fileType, bufferSize, FileFactory.getConfiguration());
}
/**
@@ -305,12 +310,12 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
return new DataInputStream(new BufferedInputStream(stream));
}
- @Override public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
- int bufferSize, String compressor) throws IOException {
+ private DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
+ int bufferSize, String compressor, Configuration configuration) throws IOException {
path = path.replace("\\", "/");
Path pt = new Path(path);
InputStream inputStream;
- FileSystem fs = pt.getFileSystem(FileFactory.getConfiguration());
+ FileSystem fs = pt.getFileSystem(configuration);
if (bufferSize <= 0) {
inputStream = fs.open(pt);
} else {
@@ -509,7 +514,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
RemoteIterator<LocatedFileStatus> listStatus = null;
if (null != fileStatus && fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
- listStatus = path.getFileSystem(FileFactory.getConfiguration()).listFiles(path, recursive);
+ listStatus = fs.listFiles(path, recursive);
} else {
return new ArrayList<CarbonFile>();
}
@@ -521,8 +526,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
if (null != fileStatus && fileStatus.isDirectory()) {
List<FileStatus> listStatus = new ArrayList<>();
Path path = fileStatus.getPath();
- RemoteIterator<LocatedFileStatus> iter =
- path.getFileSystem(FileFactory.getConfiguration()).listLocatedStatus(path);
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus fileStatus = iter.next();
if (pathFilter.accept(fileStatus.getPath()) && fileStatus.getLen() > 0) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index b462e0c..e8f6cfb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -111,6 +111,11 @@ public final class FileFactory {
return getDataInputStream(path, fileType, -1);
}
+ public static DataInputStream getDataInputStream(String path, FileType fileType,
+ Configuration configuration) throws IOException {
+ return getDataInputStream(path, fileType, -1, configuration);
+ }
+
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
return getDataInputStream(path, fileType, bufferSize, getConfiguration());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index fa84f30..323899e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.BlockletDataMapUtil;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Class to handle loading, unloading,clearing,storing of the table
* blocks
@@ -87,7 +89,8 @@ public class BlockletDataMapIndexStore
List<BlockDataMap> dataMaps = new ArrayList<>();
if (blockletDataMapIndexWrapper == null) {
try {
- SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ SegmentIndexFileStore indexFileStore =
+ new SegmentIndexFileStore(identifierWrapper.getConfiguration());
Set<String> filesRead = new HashSet<>();
String segmentFilePath = identifier.getIndexFilePath();
if (segInfoCache == null) {
@@ -97,7 +100,8 @@ public class BlockletDataMapIndexStore
segInfoCache.get(segmentFilePath);
if (carbonDataFileBlockMetaInfoMapping == null) {
carbonDataFileBlockMetaInfoMapping =
- BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath);
+ BlockletDataMapUtil.createCarbonDataFileBlockMetaInfoMapping(segmentFilePath,
+ identifierWrapper.getConfiguration());
segInfoCache.put(segmentFilePath, carbonDataFileBlockMetaInfoMapping);
}
// if the identifier is not a merge file we can directly load the datamaps
@@ -107,10 +111,12 @@ public class BlockletDataMapIndexStore
carbonDataFileBlockMetaInfoMapping);
BlockDataMap blockletDataMap =
loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap,
- identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe());
+ identifierWrapper.getCarbonTable(), identifierWrapper.isAddTableBlockToUnsafe(),
+ identifierWrapper.getConfiguration());
dataMaps.add(blockletDataMap);
blockletDataMapIndexWrapper =
- new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
+ new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
+ identifierWrapper.getConfiguration());
} else {
// if the identifier is a merge file then collect the index files and load the datamaps
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
@@ -125,12 +131,14 @@ public class BlockletDataMapIndexStore
BlockDataMap blockletDataMap =
loadAndGetDataMap(blockIndexUniqueIdentifier, indexFileStore, blockMetaInfoMap,
identifierWrapper.getCarbonTable(),
- identifierWrapper.isAddTableBlockToUnsafe());
+ identifierWrapper.isAddTableBlockToUnsafe(),
+ identifierWrapper.getConfiguration());
dataMaps.add(blockletDataMap);
}
}
blockletDataMapIndexWrapper =
- new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps);
+ new BlockletDataMapIndexWrapper(identifier.getSegmentId(), dataMaps,
+ identifierWrapper.getConfiguration());
}
lruCache.put(identifier.getUniqueTableSegmentIdentifier(), blockletDataMapIndexWrapper,
blockletDataMapIndexWrapper.getMemorySize());
@@ -265,7 +273,7 @@ public class BlockletDataMapIndexStore
*/
private BlockDataMap loadAndGetDataMap(TableBlockIndexUniqueIdentifier identifier,
SegmentIndexFileStore indexFileStore, Map<String, BlockMetaInfo> blockMetaInfoMap,
- CarbonTable carbonTable, boolean addTableBlockToUnsafe)
+ CarbonTable carbonTable, boolean addTableBlockToUnsafe, Configuration configuration)
throws IOException, MemoryException {
String uniqueTableSegmentIdentifier =
identifier.getUniqueTableSegmentIdentifier();
@@ -279,7 +287,7 @@ public class BlockletDataMapIndexStore
dataMap.init(new BlockletDataMapModel(carbonTable,
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
- blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe));
+ blockMetaInfoMap, identifier.getSegmentId(), addTableBlockToUnsafe, configuration));
}
return dataMap;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
index 2cf0259..7b8a13b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java
@@ -24,19 +24,27 @@ import org.apache.carbondata.core.cache.Cacheable;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.indexstore.blockletindex.BlockDataMap;
+import org.apache.hadoop.conf.Configuration;
+
/**
* A cacheable wrapper of datamaps
*/
public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
+ private static final long serialVersionUID = -2859075086955465810L;
+
private List<BlockDataMap> dataMaps;
private String segmentId;
+ private transient Configuration configuration;
+
// size of the wrapper. basically the total size of the datamaps this wrapper is holding
private long wrapperSize;
- public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps) {
+ public BlockletDataMapIndexWrapper(String segmentId,List<BlockDataMap> dataMaps, Configuration
+ configuration) {
+ this.configuration = configuration;
this.dataMaps = dataMaps;
this.wrapperSize = 0L;
this.segmentId = segmentId;
@@ -72,4 +80,8 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable {
public String getSegmentId() {
return segmentId;
}
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
index 0924f1f..77756de 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifierWrapper.java
@@ -19,8 +19,11 @@ package org.apache.carbondata.core.indexstore;
import java.io.Serializable;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Class holds reference to TableBlockIndexUniqueIdentifier and carbonTable related info
* This is just a wrapper passed between methods like a context, This object must never be cached.
@@ -35,6 +38,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
// holds the reference to CarbonTable
private CarbonTable carbonTable;
+
+ private transient Configuration configuration;
/**
* flag to specify whether to load table block metadata in unsafe or safe. Default value is true
*/
@@ -44,6 +49,15 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable) {
this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
this.carbonTable = carbonTable;
+ this.configuration = FileFactory.getConfiguration();
+ }
+
+ public TableBlockIndexUniqueIdentifierWrapper(
+ TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier, CarbonTable carbonTable,
+ Configuration configuration) {
+ this.tableBlockIndexUniqueIdentifier = tableBlockIndexUniqueIdentifier;
+ this.carbonTable = carbonTable;
+ this.configuration = configuration;
}
// Note: The constructor is getting used in extensions with other functionalities.
@@ -53,6 +67,7 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
boolean addTableBlockToUnsafe) {
this(tableBlockIndexUniqueIdentifier, carbonTable);
this.addTableBlockToUnsafe = addTableBlockToUnsafe;
+ this.configuration = FileFactory.getConfiguration();
}
@@ -67,4 +82,8 @@ public class TableBlockIndexUniqueIdentifierWrapper implements Serializable {
public boolean isAddTableBlockToUnsafe() {
return addTableBlockToUnsafe;
}
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 2dbf6a0..57c92c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -103,11 +103,13 @@ public class BlockDataMap extends CoarseGrainDataMap
*/
protected boolean isFilePathStored;
- @Override public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
+ @Override public void init(DataMapModel dataMapModel)
+ throws IOException, MemoryException {
long startTime = System.currentTimeMillis();
assert (dataMapModel instanceof BlockletDataMapModel);
BlockletDataMapModel blockletDataMapInfo = (BlockletDataMapModel) dataMapModel;
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ DataFileFooterConverter fileFooterConverter =
+ new DataFileFooterConverter(dataMapModel.getConfiguration());
List<DataFileFooter> indexInfo = fileFooterConverter
.getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData(),
blockletDataMapInfo.getCarbonTable().isTransactionalTable());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index da2fa39..e16c3cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -137,7 +137,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
tableBlockIndexUniqueIdentifierWrappers.add(
new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
- this.getCarbonTable()));
+ this.getCarbonTable(), segment.getConfiguration()));
}
}
List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 180c812..0a75d59 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -22,6 +22,8 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.hadoop.conf.Configuration;
+
/**
* It is the model object to keep the information to build or initialize BlockletDataMap.
*/
@@ -37,9 +39,9 @@ public class BlockletDataMapModel extends DataMapModel {
private boolean addToUnsafe = true;
- public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
- byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
- super(filePath);
+ public BlockletDataMapModel(CarbonTable carbonTable, String filePath, byte[] fileData,
+ Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId, Configuration configuration) {
+ super(filePath, configuration);
this.fileData = fileData;
this.blockMetaInfoMap = blockMetaInfoMap;
this.segmentId = segmentId;
@@ -48,8 +50,8 @@ public class BlockletDataMapModel extends DataMapModel {
public BlockletDataMapModel(CarbonTable carbonTable, String filePath,
byte[] fileData, Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId,
- boolean addToUnsafe) {
- this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId);
+ boolean addToUnsafe, Configuration configuration) {
+ this(carbonTable, filePath, fileData, blockMetaInfoMap, segmentId, configuration);
this.addToUnsafe = addToUnsafe;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 25cfc26..f19c9c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.MergedBlockIndex;
import org.apache.carbondata.format.MergedBlockIndexHeader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
/**
@@ -76,10 +77,20 @@ public class SegmentIndexFileStore {
*/
private Map<String, List<String>> carbonMergeFileToIndexFilesMap;
+ private Configuration configuration;
+
public SegmentIndexFileStore() {
carbonIndexMap = new HashMap<>();
carbonIndexMapWithFullPath = new TreeMap<>();
carbonMergeFileToIndexFilesMap = new HashMap<>();
+ configuration = FileFactory.getConfiguration();
+ }
+
+ public SegmentIndexFileStore(Configuration configuration) {
+ carbonIndexMap = new HashMap<>();
+ carbonIndexMapWithFullPath = new TreeMap<>();
+ carbonMergeFileToIndexFilesMap = new HashMap<>();
+ this.configuration = configuration;
}
/**
@@ -89,7 +100,7 @@ public class SegmentIndexFileStore {
* @throws IOException
*/
public void readAllIIndexOfSegment(String segmentPath) throws IOException {
- CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+ CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration);
for (int i = 0; i < carbonIndexFiles.length; i++) {
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
readMergeFile(carbonIndexFiles[i].getCanonicalPath());
@@ -155,7 +166,8 @@ public class SegmentIndexFileStore {
* @throws IOException
*/
public void readAllIndexAndFillBolckletInfo(String segmentPath) throws IOException {
- CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+ CarbonFile[] carbonIndexFiles =
+ getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
for (int i = 0; i < carbonIndexFiles.length; i++) {
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
readMergeFile(carbonIndexFiles[i].getCanonicalPath());
@@ -190,7 +202,8 @@ public class SegmentIndexFileStore {
* @throws IOException
*/
public Map<String, String> getIndexFilesFromSegment(String segmentPath) throws IOException {
- CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+ CarbonFile[] carbonIndexFiles =
+ getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
Map<String, String> indexFiles = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -216,7 +229,8 @@ public class SegmentIndexFileStore {
*/
public Map<String, String> getMergeOrIndexFilesFromSegment(String segmentPath)
throws IOException {
- CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
+ CarbonFile[] carbonIndexFiles =
+ getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
Map<String, String> indexFiles = new HashMap<>();
for (int i = 0; i < carbonIndexFiles.length; i++) {
if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
@@ -251,7 +265,7 @@ public class SegmentIndexFileStore {
* @throws IOException
*/
public void readMergeFile(String mergeFilePath) throws IOException {
- ThriftReader thriftReader = new ThriftReader(mergeFilePath);
+ ThriftReader thriftReader = new ThriftReader(mergeFilePath, configuration);
try {
thriftReader.open();
MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
@@ -259,7 +273,7 @@ public class SegmentIndexFileStore {
List<String> file_names = indexHeader.getFile_names();
carbonMergeFileToIndexFilesMap.put(mergeFilePath, file_names);
List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
- CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
+ CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath, configuration);
String mergeFileAbsolutePath = mergeFile.getParentFile().getAbsolutePath();
assert (file_names.size() == fileData.size());
for (int i = 0; i < file_names.size(); i++) {
@@ -282,8 +296,8 @@ public class SegmentIndexFileStore {
*/
private void readIndexFile(CarbonFile indexFile) throws IOException {
String indexFilePath = indexFile.getCanonicalPath();
- DataInputStream dataInputStream =
- FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
+ DataInputStream dataInputStream = FileFactory
+ .getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath), configuration);
byte[] bytes = new byte[(int) indexFile.getSize()];
try {
dataInputStream.readFully(bytes);
@@ -362,8 +376,8 @@ public class SegmentIndexFileStore {
* @param segmentPath
* @return
*/
- public static CarbonFile[] getCarbonIndexFiles(String segmentPath) {
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ public static CarbonFile[] getCarbonIndexFiles(String segmentPath, Configuration configuration) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, configuration);
return carbonFile.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
@@ -422,7 +436,8 @@ public class SegmentIndexFileStore {
indexReader.openThriftReader(indexFile.getCanonicalPath());
// get the index header
org.apache.carbondata.format.IndexHeader indexHeader = indexReader.readIndexHeader();
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ DataFileFooterConverter fileFooterConverter =
+ new DataFileFooterConverter(FileFactory.getConfiguration());
String filePath = FileFactory.getUpdatedFilePath(indexFile.getCanonicalPath());
String parentPath =
filePath.substring(0, filePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1acf0ea..44a2f7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
@@ -469,8 +470,8 @@ public class SegmentFileStore {
* @readSegment method before calling it.
* @throws IOException
*/
- public void readIndexFiles() throws IOException {
- readIndexFiles(SegmentStatus.SUCCESS, false);
+ public void readIndexFiles(Configuration configuration) throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false, configuration);
}
public SegmentFile getSegmentFile() {
@@ -484,8 +485,8 @@ public class SegmentFileStore {
* @param ignoreStatus
* @throws IOException
*/
- private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus)
- throws IOException {
+ private List<String> readIndexFiles(SegmentStatus status, boolean ignoreStatus,
+ Configuration configuration) throws IOException {
if (indexFilesMap != null) {
return new ArrayList<>();
}
@@ -494,7 +495,7 @@ public class SegmentFileStore {
indexFilesMap = new HashMap<>();
indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(configuration);
for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
List<DataFileFooter> indexInfo =
fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
@@ -538,8 +539,8 @@ public class SegmentFileStore {
Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
- List<DataFileFooter> indexInfo =
- fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+ List<DataFileFooter> indexInfo = fileFooterConverter
+ .getIndexInfo(entry.getKey(), entry.getValue());
if (indexInfo.size() > 0) {
schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
}
@@ -733,8 +734,8 @@ public class SegmentFileStore {
// take the list of files from this segment.
SegmentFileStore fileStore =
new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
- List<String> indexOrMergeFiles =
- fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+ List<String> indexOrMergeFiles = fileStore
+ .readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false, FileFactory.getConfiguration());
if (forceDelete) {
deletePhysicalPartition(
partitionSpecs,
@@ -791,7 +792,8 @@ public class SegmentFileStore {
List<PartitionSpec> partitionSpecs,
SegmentUpdateStatusManager updateStatusManager) throws Exception {
SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
- List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+ List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
+ FileFactory.getConfiguration());
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 57370f6..d0bc976 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.hadoop.conf.Configuration;
+
/**
* TODO: It should be removed after store manager implementation.
*/
@@ -81,10 +83,25 @@ public class SchemaReader {
}
public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
+ boolean isCarbonFileProvider, Configuration configuration) throws IOException {
+
+ org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+ .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider,
+ configuration);
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
+ identifier.getTablePath());
+ wrapperTableInfo.setTransactionalTable(false);
+ return wrapperTableInfo;
+ }
+
+ public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
boolean isCarbonFileProvider) throws IOException {
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
- .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider);
+ .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider,
+ FileFactory.getConfiguration());
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 7df3937..d52eeb2 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -228,7 +228,7 @@ public class CarbonUpdateUtil {
}
// if the segments is in the list of marked for delete then update the status.
- if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName(), null))) {
+ if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
}
@@ -391,7 +391,7 @@ public class CarbonUpdateUtil {
List<String> dataFiles = new ArrayList<>();
if (segment.getSegmentFileName() != null) {
SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
- fileStore.readIndexFiles();
+ fileStore.readIndexFiles(FileFactory.getConfiguration());
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
List<String> dataFilePaths = new ArrayList<>();
for (List<String> paths : indexFilesMap.values()) {
@@ -737,7 +737,7 @@ public class CarbonUpdateUtil {
for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
if (eachSeg.getValue() == 0) {
- segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null));
+ segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), ""));
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index abd9c2c..9dafed9 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -36,6 +36,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.hadoop.conf.Configuration;
+
/**
* This is a readCommittedScope for non transactional carbon table
*/
@@ -43,10 +45,12 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
@InterfaceStability.Stable
public class LatestFilesReadCommittedScope implements ReadCommittedScope {
+ private static final long serialVersionUID = -839970494288861816L;
private String carbonFilePath;
private String segmentId;
private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
private LoadMetadataDetails[] loadMetadataDetails;
+ private transient Configuration configuration;
/**
* a new constructor of this class
@@ -54,7 +58,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
* @param path carbon file path
* @param segmentId segment id
*/
- public LatestFilesReadCommittedScope(String path, String segmentId) throws IOException {
+ public LatestFilesReadCommittedScope(String path, String segmentId, Configuration configuration)
+ throws IOException {
+ this.configuration = configuration;
Objects.requireNonNull(path);
this.carbonFilePath = path;
this.segmentId = segmentId;
@@ -66,8 +72,9 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
*
* @param path carbon file path
*/
- public LatestFilesReadCommittedScope(String path) throws IOException {
- this(path, null);
+ public LatestFilesReadCommittedScope(String path, Configuration configuration)
+ throws IOException {
+ this(path, null, configuration);
}
/**
@@ -75,7 +82,8 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
*
* @param indexFiles carbon index files
*/
- public LatestFilesReadCommittedScope(CarbonFile[] indexFiles) {
+ public LatestFilesReadCommittedScope(CarbonFile[] indexFiles, Configuration configuration) {
+ this.configuration = configuration;
takeCarbonIndexFileSnapShot(indexFiles);
}
@@ -180,7 +188,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
carbonIndexFiles = indexFiles.toArray(new CarbonFile[0]);
} else {
String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId);
- carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+ carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, configuration);
}
if (carbonIndexFiles.length == 0) {
throw new IOException(
@@ -232,4 +240,11 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
prepareLoadMetadata();
}
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index cbcf173..aea7e97 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -27,6 +27,8 @@ import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
+import org.apache.hadoop.conf.Configuration;
+
/**
* ReadCommitted interface that defines a read scope.
*/
@@ -48,4 +50,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
throws IOException;
void takeCarbonIndexFileSnapShot() throws IOException;
+
+ Configuration getConfiguration();
+
+ void setConfiguration(Configuration configuration);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 1f61aab..ac0d156 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -31,6 +31,8 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.hadoop.conf.Configuration;
+
/**
* ReadCommittedScope for the managed carbon table
*/
@@ -38,18 +40,24 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
@InterfaceStability.Stable
public class TableStatusReadCommittedScope implements ReadCommittedScope {
+ private static final long serialVersionUID = 2324397174595872738L;
private LoadMetadataDetails[] loadMetadataDetails;
private AbsoluteTableIdentifier identifier;
- public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException {
+ private transient Configuration configuration;
+
+ public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
+ Configuration configuration) throws IOException {
this.identifier = identifier;
+ this.configuration = configuration;
takeCarbonIndexFileSnapShot();
}
public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
- LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+ LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) throws IOException {
this.identifier = identifier;
+ this.configuration = configuration;
this.loadMetadataDetails = loadMetadataDetails;
}
@@ -97,4 +105,11 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
}
+ @Override public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
index dfd5815..d3d9177 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonHeaderReader.java
@@ -20,11 +20,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.format.FileHeader;
import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
/**
@@ -36,10 +38,17 @@ public class CarbonHeaderReader {
//Fact file path
private String filePath;
+ private Configuration configuration;
+
public CarbonHeaderReader(String filePath) {
this.filePath = filePath;
}
+ public CarbonHeaderReader(String filePath, Configuration configuration) {
+ this.filePath = filePath;
+ this.configuration = configuration;
+ }
+
/**
* It reads the metadata in FileFooter thrift object format.
*
@@ -62,12 +71,12 @@ public class CarbonHeaderReader {
* @throws IOException
*/
private ThriftReader openThriftReader(String filePath) {
-
+ Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration();
return new ThriftReader(filePath, new ThriftReader.TBaseCreator() {
@Override public TBase create() {
return new FileHeader();
}
- });
+ }, conf);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
index 4617a12..27eaa32 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonIndexFileReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.IndexHeader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
/**
@@ -28,6 +29,15 @@ import org.apache.thrift.TBase;
*/
public class CarbonIndexFileReader {
+ private Configuration configuration;
+
+ public CarbonIndexFileReader() {
+
+ }
+
+ public CarbonIndexFileReader(Configuration configuration) {
+ this.configuration = configuration;
+ }
/**
* reader
*/
@@ -75,7 +85,7 @@ public class CarbonIndexFileReader {
* @throws IOException
*/
public void openThriftReader(String filePath) throws IOException {
- thriftReader = new ThriftReader(filePath);
+ thriftReader = new ThriftReader(filePath, configuration);
thriftReader.open();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
index 221a285..48d8345 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -55,6 +56,8 @@ public class ThriftReader {
*/
private TProtocol binaryIn;
+ private Configuration configuration;
+
/**
* Constructor.
*/
@@ -63,6 +66,12 @@ public class ThriftReader {
this.creator = creator;
}
+ public ThriftReader(String fileName, TBaseCreator creator, Configuration configuration) {
+ this.fileName = fileName;
+ this.configuration = configuration;
+ this.creator = creator;
+ }
+
/**
* Constructor.
*/
@@ -73,6 +82,14 @@ public class ThriftReader {
/**
* Constructor.
*/
+ public ThriftReader(String fileName, Configuration configuration) {
+ this.fileName = fileName;
+ this.configuration = configuration;
+ }
+
+ /**
+ * Constructor.
+ */
public ThriftReader(byte[] fileData) {
dataInputStream = new DataInputStream(new ByteArrayInputStream(fileData));
binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
@@ -82,8 +99,9 @@ public class ThriftReader {
* Opens the fileName for reading.
*/
public void open() throws IOException {
+ Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration();
FileFactory.FileType fileType = FileFactory.getFileType(fileName);
- dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize);
+ dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize, conf);
binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index fdce76b..f1ee877 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -55,6 +55,7 @@ import org.apache.carbondata.core.util.DeleteLoadFolders;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
/**
* Manages Load/Segment status
@@ -66,8 +67,16 @@ public class SegmentStatusManager {
private AbsoluteTableIdentifier identifier;
+ private Configuration configuration;
+
public SegmentStatusManager(AbsoluteTableIdentifier identifier) {
this.identifier = identifier;
+ configuration = FileFactory.getConfiguration();
+ }
+
+ public SegmentStatusManager(AbsoluteTableIdentifier identifier, Configuration configuration) {
+ this.identifier = identifier;
+ this.configuration = configuration;
}
/**
@@ -93,21 +102,10 @@ public class SegmentStatusManager {
}
}
- /**
- * get valid segment for given table
- *
- * @return
- * @throws IOException
- */
public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
return getValidAndInvalidSegments(null, null);
}
- public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
- LoadMetadataDetails[] loadMetadataDetails) throws IOException {
- return getValidAndInvalidSegments(loadMetadataDetails, null);
- }
-
/**
* get valid segment for given load status details.
*/
@@ -129,7 +127,8 @@ public class SegmentStatusManager {
}
if (readCommittedScope == null) {
- readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails);
+ readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails,
+ configuration);
}
//just directly iterate Array
for (LoadMetadataDetails segment : loadMetadataDetails) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 168a526..27bc620 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -47,11 +47,19 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Footer reader class
*/
public abstract class AbstractDataFileFooterConverter {
+ protected Configuration configuration;
+
+ AbstractDataFileFooterConverter(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
/**
* Below method will be used to convert the thrift presence meta to wrapper
* presence meta
@@ -77,7 +85,8 @@ public abstract class AbstractDataFileFooterConverter {
* @return list of index info
* @throws IOException problem while reading the index file
*/
- public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
+ public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo>
+ tableBlockInfoList)
throws IOException {
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
@@ -144,7 +153,7 @@ public abstract class AbstractDataFileFooterConverter {
*/
public List<DataFileFooter> getIndexInfo(String filePath, byte[] fileData,
boolean isTransactionalTable) throws IOException {
- CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader(configuration);
List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
try {
@@ -188,7 +197,7 @@ public abstract class AbstractDataFileFooterConverter {
}
if (readBlockIndexInfo.isSetBlocklet_info()) {
List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
- BlockletInfo blockletInfo = new DataFileFooterConverterV3()
+ BlockletInfo blockletInfo = new DataFileFooterConverterV3(configuration)
.getBlockletInfo(readBlockIndexInfo.getBlocklet_info(),
CarbonUtil.getNumberOfDimensionColumns(columnSchemaList));
blockletInfo.setBlockletIndex(blockletIndex);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 8e8b075..f14610c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -80,7 +81,7 @@ public class BlockletDataMapUtil {
&& indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
CarbonFile indexMergeFile = FileFactory.getCarbonFile(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getMergeIndexFileName());
+ .getMergeIndexFileName(), identifierWrapper.getConfiguration());
if (indexMergeFile.exists() && !filesRead.contains(indexMergeFile.getPath())) {
indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
filesRead.add(indexMergeFile.getPath());
@@ -89,7 +90,7 @@ public class BlockletDataMapUtil {
if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
- .getIndexFileName()) });
+ .getIndexFileName(), identifierWrapper.getConfiguration()) });
}
Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
CarbonTable carbonTable = identifierWrapper.getCarbonTable();
@@ -98,7 +99,8 @@ public class BlockletDataMapUtil {
tableColumnList =
carbonTable.getTableInfo().getFactTable().getListOfColumns();
}
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ DataFileFooterConverter fileFooterConverter =
+ new DataFileFooterConverter(identifierWrapper.getConfiguration());
List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
.getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
@@ -139,9 +141,9 @@ public class BlockletDataMapUtil {
* @throws IOException
*/
public static Map<String, BlockMetaInfo> createCarbonDataFileBlockMetaInfoMapping(
- String segmentFilePath) throws IOException {
+ String segmentFilePath, Configuration configuration) throws IOException {
Map<String, BlockMetaInfo> fileNameToMetaInfoMapping = new TreeMap();
- CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFilePath, configuration);
if (carbonFile instanceof AbstractDFSCarbonFile) {
PathFilter pathFilter = new PathFilter() {
@Override public boolean accept(Path path) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c9601c0..fa982be 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1277,34 +1277,6 @@ public final class CarbonUtil {
}
/**
- * Below method will be used to get all the block index info from index file
- *
- * @param taskId task id of the file
- * @param tableBlockInfoList list of table block
- * @param identifier absolute table identifier
- * @return list of block info
- * @throws IOException if any problem while reading
- */
- public static List<DataFileFooter> readCarbonIndexFile(String taskId, String bucketNumber,
- List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier identifier)
- throws IOException {
- // need to sort the block info list based for task in ascending order so
- // it will be sinkup with block index read from file
- Collections.sort(tableBlockInfoList);
- // geting the index file path
- //TODO need to pass proper partition number when partiton will be supported
- String carbonIndexFilePath = CarbonTablePath
- .getCarbonIndexFilePath(identifier.getTablePath(), taskId,
- tableBlockInfoList.get(0).getSegmentId(),
- bucketNumber, CarbonTablePath.DataFileUtil
- .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
- tableBlockInfoList.get(0).getVersion());
- DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
- // read the index info and return
- return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList);
- }
-
- /**
* initialize the value of dictionary chunk that can be kept in memory at a time
*
* @return
@@ -2243,17 +2215,17 @@ public final class CarbonUtil {
}
}
- public static String getFilePathExternalFilePath(String path) {
+ public static String getFilePathExternalFilePath(String path, Configuration configuration) {
// return the list of carbondata files in the given path.
- CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile segment = FileFactory.getCarbonFile(path, configuration);
CarbonFile[] dataFiles = segment.listFiles();
for (CarbonFile dataFile : dataFiles) {
if (dataFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
return dataFile.getAbsolutePath();
} else if (dataFile.isDirectory()) {
- return getFilePathExternalFilePath(dataFile.getAbsolutePath());
+ return getFilePathExternalFilePath(dataFile.getAbsolutePath(), configuration);
}
}
return null;
@@ -2265,12 +2237,14 @@ public final class CarbonUtil {
* @return table info containing the schema
*/
public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath,
- String tableName, boolean isCarbonFileProvider) throws IOException {
+ String tableName, boolean isCarbonFileProvider, Configuration configuration)
+ throws IOException {
String fistFilePath = null;
if (isCarbonFileProvider) {
- fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+ fistFilePath = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null",
+ configuration);
} else {
- fistFilePath = getFilePathExternalFilePath(carbonDataFilePath);
+ fistFilePath = getFilePathExternalFilePath(carbonDataFilePath, configuration);
}
if (fistFilePath == null) {
// Check if we can infer the schema from the hive metastore.
@@ -2645,7 +2619,7 @@ public final class CarbonUtil {
HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
Map<String, SegmentFileStore.FolderDetails> locationMap = fileStore.getLocationMap();
if (locationMap != null) {
- fileStore.readIndexFiles();
+ fileStore.readIndexFiles(FileFactory.getConfiguration());
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
// get the size of carbonindex file
carbonIndexSize = getCarbonIndexSize(fileStore, locationMap);
@@ -3192,8 +3166,7 @@ public final class CarbonUtil {
* @param carbonTable
* carbon Table
*/
- public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable)
- throws IOException {
+ public static ColumnarFormatVersion getFormatVersion(CarbonTable carbonTable) throws IOException {
String segmentPath = null;
boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 670536e..61b4f37 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -33,12 +33,22 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonFooterReader;
import org.apache.carbondata.format.FileFooter;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to convert the thrift object of data file
* meta data to wrapper object
*/
public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
+ public DataFileFooterConverter(Configuration configuration) {
+ super(configuration);
+ }
+
+ public DataFileFooterConverter() {
+ super(FileFactory.getConfiguration());
+ }
+
/**
* Below method will be used to convert thrift file meta to wrapper file meta
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index b20a336..db52991 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -29,6 +30,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonFooterReader;
import org.apache.carbondata.format.FileFooter;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Below class will be used to convert the thrift object of data file
* meta data to wrapper object for version 2 data file
@@ -36,6 +39,14 @@ import org.apache.carbondata.format.FileFooter;
public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
+ public DataFileFooterConverter2(Configuration configuration) {
+ super(configuration);
+ }
+
+ public DataFileFooterConverter2() {
+ super(FileFactory.getConfiguration());
+ }
+
/**
* Below method will be used to convert thrift file meta to wrapper file meta
*/
@@ -136,6 +147,6 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
}
@Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
- return new DataFileFooterConverter().getSchema(tableBlockInfo);
+ return new DataFileFooterConverter(configuration).getSchema(tableBlockInfo);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 6a968b4..41e22fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -31,8 +32,18 @@ import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.format.FileFooter3;
import org.apache.carbondata.format.FileHeader;
+import org.apache.hadoop.conf.Configuration;
+
public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
+ public DataFileFooterConverterV3(Configuration configuration) {
+ super(configuration);
+ }
+
+ public DataFileFooterConverterV3() {
+ super(FileFactory.getConfiguration());
+ }
+
/**
* Below method will be used to convert thrift file meta to wrapper file meta
* This method will read the footer from footer offset present in the data file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 1634091..9dde6b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.SegmentFileStore;
@@ -77,7 +78,8 @@ public class CarbonIndexFileMergeWriter {
List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
} else {
- indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+ indexFiles =
+ SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
}
if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
if (sfs == null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index a3acfab..34dca0b 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -40,10 +40,12 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import mockit.Deencapsulation;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
import org.junit.Test;
@@ -103,7 +105,8 @@ public class TestBlockletDataMapFactory {
BlockletDataMapIndexWrapper.class);
method.setAccessible(true);
method.invoke(blockletDataMapFactory, tableBlockIndexUniqueIdentifierWrapper,
- new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps));
+ new BlockletDataMapIndexWrapper(tableBlockIndexUniqueIdentifier.getSegmentId(), dataMaps,
+ tableBlockIndexUniqueIdentifierWrapper.getConfiguration()));
BlockletDataMapIndexWrapper result = cache.getIfPresent(tableBlockIndexUniqueIdentifierWrapper);
assert null != result;
}
@@ -111,7 +114,8 @@ public class TestBlockletDataMapFactory {
@Test public void getValidDistributables() throws IOException {
BlockletDataMapDistributable blockletDataMapDistributable = new BlockletDataMapDistributable(
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756709.carbonindex");
- Segment segment = new Segment("0", null);
+ Segment segment = new Segment("0", null, new TableStatusReadCommittedScope(carbonTable
+ .getAbsoluteTableIdentifier(), new Configuration(false)));
blockletDataMapDistributable.setSegment(segment);
BlockletDataMapDistributable blockletDataMapDistributable1 = new BlockletDataMapDistributable(
"/opt/store/default/carbon_table/Fact/Part0/Segment_0/0_batchno0-0-1521012756701.carbonindex");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 5520bfb..a4abc61 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -684,8 +684,8 @@ public class CarbonUtilTest {
@Test public void testToGetSegmentString() {
List<Segment> list = new ArrayList<>();
- list.add(new Segment("1", null));
- list.add(new Segment("2", null));
+ list.add(new Segment("1", null, null));
+ list.add(new Segment("2", null, null));
String segments = CarbonUtil.convertToString(list);
assertEquals(segments, "1,2");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index e506994..2705b63 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.format.IndexHeader;
import mockit.Mock;
import mockit.MockUp;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import static junit.framework.TestCase.assertEquals;
@@ -143,7 +144,7 @@ public class DataFileFooterConverterTest {
new MockUp<FileFactory>() {
@SuppressWarnings("unused") @Mock
public DataInputStream getDataInputStream(String path, FileFactory.FileType fileType,
- int bufferSize) {
+ int bufferSize, Configuration configuration) {
return dataInputStream;
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 0d240c4..1e5b79c 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -237,7 +237,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
}
for (String shard : shardPaths) {
BloomCoarseGrainDataMap bloomDM = new BloomCoarseGrainDataMap();
- bloomDM.init(new BloomDataMapModel(shard, cache));
+ bloomDM.init(new BloomDataMapModel(shard, cache, segment.getConfiguration()));
bloomDM.initIndexColumnConverters(getCarbonTable(), dataMapMeta.getIndexedColumns());
dataMaps.add(bloomDM);
}
@@ -253,7 +253,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
BloomCoarseGrainDataMap bloomCoarseGrainDataMap = new BloomCoarseGrainDataMap();
String indexPath = ((BloomDataMapDistributable) distributable).getIndexPath();
- bloomCoarseGrainDataMap.init(new BloomDataMapModel(indexPath, cache));
+ bloomCoarseGrainDataMap
+ .init(new BloomDataMapModel(indexPath, cache, FileFactory.getConfiguration()));
bloomCoarseGrainDataMap.initIndexColumnConverters(getCarbonTable(),
dataMapMeta.getIndexedColumns());
coarseGrainDataMaps.add(bloomCoarseGrainDataMap);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8f1a029b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
index 9d5d741..7ae4906 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapModel.java
@@ -19,13 +19,16 @@ package org.apache.carbondata.datamap.bloom;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.hadoop.conf.Configuration;
+
public class BloomDataMapModel extends DataMapModel {
private Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache;
public BloomDataMapModel(String filePath,
- Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache) {
- super(filePath);
+ Cache<BloomCacheKeyValue.CacheKey, BloomCacheKeyValue.CacheValue> cache,
+ Configuration configuration) {
+ super(filePath, configuration);
this.cache = cache;
}