You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:28 UTC
[03/50] [abbrv] carbondata git commit: [CARBONDATA-2489] Coverity
scan fixes
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
deleted file mode 100644
index 16d4d53..0000000
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.converter.SchemaConverter;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
-import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.metadata.schema.table.TableSchema;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
-import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
-import org.apache.carbondata.core.writer.ThriftWriter;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
-import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
-import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
-import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.util.TableOptionConstant;
-
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * This class will create store file based on provided schema
- *
- */
-public class StoreCreator {
-
- private static AbsoluteTableIdentifier identifier;
- private static String storePath = "";
- static {
- try {
- storePath = new File("target/store").getCanonicalPath();
- String dbName = "testdb";
- String tableName = "testtable";
- identifier =
- AbsoluteTableIdentifier.from(
- storePath + "/testdb/testtable",
- new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
- } catch (IOException ex) {
-
- }
- }
-
- public static AbsoluteTableIdentifier getIdentifier() {
- return identifier;
- }
-
- /**
- * Create store without any restructure
- */
- public static void createCarbonStore() {
- try {
- String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
- File storeDir = new File(storePath);
- CarbonUtil.deleteFoldersAndFiles(storeDir);
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
- storePath);
-
- CarbonTable table = createTable();
- writeDictionary(factFilePath, table);
- CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
- CarbonLoadModel loadModel = new CarbonLoadModel();
- loadModel.setCarbonDataLoadSchema(schema);
- loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
- loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
- loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
- loadModel.setCarbonTransactionalTable(true);
- loadModel.setFactFilePath(factFilePath);
- loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setTablePath(identifier.getTablePath());
- loadModel.setDateFormat(null);
- loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
- loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
- loadModel
- .setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
- loadModel
- .setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
- loadModel
- .setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
- loadModel
- .setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
- loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
- loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
- loadModel.setTaskNo("0");
- loadModel.setSegmentId("0");
- loadModel.setFactTimeStamp(System.currentTimeMillis());
- loadModel.setMaxColumns("10");
-
- loadData(loadModel, storePath);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private static CarbonTable createTable() throws IOException {
- TableInfo tableInfo = new TableInfo();
- tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
- TableSchema tableSchema = new TableSchema();
- tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
- List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
- ArrayList<Encoding> encodings = new ArrayList<>();
- encodings.add(Encoding.DICTIONARY);
- ColumnSchema id = new ColumnSchema();
- id.setColumnName("ID");
- id.setColumnar(true);
- id.setDataType(DataTypes.INT);
- id.setEncodingList(encodings);
- id.setColumnUniqueId(UUID.randomUUID().toString());
- id.setDimensionColumn(true);
- id.setColumnGroup(1);
- columnSchemas.add(id);
-
- ColumnSchema date = new ColumnSchema();
- date.setColumnName("date");
- date.setColumnar(true);
- date.setDataType(DataTypes.STRING);
- date.setEncodingList(encodings);
- date.setColumnUniqueId(UUID.randomUUID().toString());
- date.setDimensionColumn(true);
- date.setColumnGroup(2);
- columnSchemas.add(date);
-
- ColumnSchema country = new ColumnSchema();
- country.setColumnName("country");
- country.setColumnar(true);
- country.setDataType(DataTypes.STRING);
- country.setEncodingList(encodings);
- country.setColumnUniqueId(UUID.randomUUID().toString());
- country.setDimensionColumn(true);
- country.setColumnGroup(3);
- columnSchemas.add(country);
-
- ColumnSchema name = new ColumnSchema();
- name.setColumnName("name");
- name.setColumnar(true);
- name.setDataType(DataTypes.STRING);
- name.setEncodingList(encodings);
- name.setColumnUniqueId(UUID.randomUUID().toString());
- name.setDimensionColumn(true);
- name.setColumnGroup(4);
- columnSchemas.add(name);
-
- ColumnSchema phonetype = new ColumnSchema();
- phonetype.setColumnName("phonetype");
- phonetype.setColumnar(true);
- phonetype.setDataType(DataTypes.STRING);
- phonetype.setEncodingList(encodings);
- phonetype.setColumnUniqueId(UUID.randomUUID().toString());
- phonetype.setDimensionColumn(true);
- phonetype.setColumnGroup(5);
- columnSchemas.add(phonetype);
-
- ColumnSchema serialname = new ColumnSchema();
- serialname.setColumnName("serialname");
- serialname.setColumnar(true);
- serialname.setDataType(DataTypes.STRING);
- serialname.setEncodingList(encodings);
- serialname.setColumnUniqueId(UUID.randomUUID().toString());
- serialname.setDimensionColumn(true);
- serialname.setColumnGroup(6);
- columnSchemas.add(serialname);
-
- ColumnSchema salary = new ColumnSchema();
- salary.setColumnName("salary");
- salary.setColumnar(true);
- salary.setDataType(DataTypes.INT);
- salary.setEncodingList(new ArrayList<Encoding>());
- salary.setColumnUniqueId(UUID.randomUUID().toString());
- salary.setDimensionColumn(false);
- salary.setColumnGroup(7);
- columnSchemas.add(salary);
-
- tableSchema.setListOfColumns(columnSchemas);
- SchemaEvolution schemaEvol = new SchemaEvolution();
- schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
- tableSchema.setSchemaEvalution(schemaEvol);
- tableSchema.setTableId(UUID.randomUUID().toString());
- tableInfo.setTableUniqueName(
- identifier.getCarbonTableIdentifier().getTableUniqueName()
- );
- tableInfo.setLastUpdatedTime(System.currentTimeMillis());
- tableInfo.setFactTable(tableSchema);
- tableInfo.setTablePath(identifier.getTablePath());
-
- String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
- String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
- CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
-
- SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- org.apache.carbondata.format.TableInfo thriftTableInfo = schemaConverter
- .fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
- tableInfo.getFactTable().getTableName());
- org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
- new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
- thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
- .add(schemaEvolutionEntry);
-
- FileFactory.FileType fileType = FileFactory.getFileType(schemaMetadataPath);
- if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
- FileFactory.mkdirs(schemaMetadataPath, fileType);
- }
-
- ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
- thriftWriter.open();
- thriftWriter.write(thriftTableInfo);
- thriftWriter.close();
- return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
- }
-
- private static void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
- BufferedReader reader = new BufferedReader(new FileReader(factFilePath));
- String header = reader.readLine();
- String[] split = header.split(",");
- List<CarbonColumn> allCols = new ArrayList<CarbonColumn>();
- List<CarbonDimension> dims = table.getDimensionByTableName(table.getTableName());
- allCols.addAll(dims);
- List<CarbonMeasure> msrs = table.getMeasureByTableName(table.getTableName());
- allCols.addAll(msrs);
- Set<String>[] set = new HashSet[dims.size()];
- for (int i = 0; i < set.length; i++) {
- set[i] = new HashSet<String>();
- }
- String line = reader.readLine();
- while (line != null) {
- String[] data = line.split(",");
- for (int i = 0; i < set.length; i++) {
- set[i].add(data[i]);
- }
- line = reader.readLine();
- }
-
- Cache dictCache = CacheProvider.getInstance()
- .createCache(CacheType.REVERSE_DICTIONARY);
- for (int i = 0; i < set.length; i++) {
- ColumnIdentifier columnIdentifier =
- new ColumnIdentifier(dims.get(i).getColumnId(), null, null);
- DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
- new DictionaryColumnUniqueIdentifier(table.getAbsoluteTableIdentifier(), columnIdentifier,
- columnIdentifier.getDataType());
- CarbonDictionaryWriter writer =
- new CarbonDictionaryWriterImpl(dictionaryColumnUniqueIdentifier);
- for (String value : set[i]) {
- writer.write(value);
- }
- writer.close();
- writer.commit();
- Dictionary dict = (Dictionary) dictCache.get(
- new DictionaryColumnUniqueIdentifier(identifier,
- columnIdentifier, dims.get(i).getDataType()));
- CarbonDictionarySortInfoPreparator preparator =
- new CarbonDictionarySortInfoPreparator();
- List<String> newDistinctValues = new ArrayList<String>();
- CarbonDictionarySortInfo dictionarySortInfo =
- preparator.getDictionarySortInfo(newDistinctValues, dict, dims.get(i).getDataType());
- CarbonDictionarySortIndexWriter carbonDictionaryWriter =
- new CarbonDictionarySortIndexWriterImpl(dictionaryColumnUniqueIdentifier);
- try {
- carbonDictionaryWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
- carbonDictionaryWriter.writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
- } finally {
- carbonDictionaryWriter.close();
- }
- }
- reader.close();
- }
-
- /**
- * Execute graph which will further load data
- *
- * @param loadModel
- * @param storeLocation
- * @throws Exception
- */
- public static void loadData(CarbonLoadModel loadModel, String storeLocation)
- throws Exception {
- new File(storeLocation).mkdirs();
- String outPutLoc = storeLocation + "/etl";
- String databaseName = loadModel.getDatabaseName();
- String tableName = loadModel.getTableName();
- String tempLocationKey = databaseName + '_' + tableName + "_1";
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
- CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
- CarbonProperties.getInstance().addProperty("send.signal.load", "false");
- CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
- CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
- CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
- CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
- CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
- CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
- CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000");
-
- String graphPath =
- outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
- + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
- File path = new File(graphPath);
- if (path.exists()) {
- path.delete();
- }
-
- BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
- 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
- Configuration configuration = new Configuration();
- CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
- CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
- CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
- CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
- CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
- CSVInputFormat.setReadBufferSize(configuration, CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
- CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
- CSVInputFormat.setMaxColumns(configuration, "10");
- CSVInputFormat.setNumberOfColumns(configuration, "7");
-
- TaskAttemptContextImpl hadoopAttemptContext = new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
- CSVInputFormat format = new CSVInputFormat();
-
- RecordReader<NullWritable, StringArrayWritable> recordReader =
- format.createRecordReader(blockDetails, hadoopAttemptContext);
-
- CSVRecordReaderIterator readerIterator = new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
- String[] storeLocationArray = new String[] {storeLocation + "/" + databaseName + "/" + tableName};
- new DataLoadExecutor().execute(loadModel,
- storeLocationArray,
- new CarbonIterator[]{readerIterator});
-
- writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
- new ArrayList<LoadMetadataDetails>());
- }
-
- public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
- String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
- loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
- loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
- loadMetadataDetails.setLoadName(String.valueOf(0));
- loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
- listOfLoadFolderDetails.add(loadMetadataDetails);
-
- String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
- + CarbonTablePath.TABLE_STATUS_FILE;
-
- DataOutputStream dataOutputStream;
- Gson gsonObjectToWrite = new Gson();
- BufferedWriter brWriter = null;
-
- AtomicFileOperations writeOperation =
- new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
- try {
-
- dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
- brWriter.write(metadataInstance);
- } catch (Exception ex) {
- throw ex;
- } finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- throw e;
-
- }
- CarbonUtil.closeStreams(brWriter);
-
- }
- writeOperation.close();
-
- }
-
- public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
- }
-
- public static void main(String[] args) {
- StoreCreator.createCarbonStore();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index d15e548..946ea0f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -165,6 +165,7 @@ public class CarbonReaderBuilder {
new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = format.createRecordReader(split, attempt);
reader.initialize(split, attempt);
+ reader.close();
readers.add(reader);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
index 394ffea..9e338e7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/store/LocalCarbonStore.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
@@ -49,13 +51,15 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
@InterfaceAudience.Internal
class LocalCarbonStore extends MetaCachedCarbonStore {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LocalCarbonStore.class.getName());
+
@Override
public Iterator<CarbonRow> scan(String path, String[] projectColumns) throws IOException {
return scan(path, projectColumns, null);
}
- @Override
- public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter)
+ @Override public Iterator<CarbonRow> scan(String path, String[] projectColumns, Expression filter)
throws IOException {
Objects.requireNonNull(path);
Objects.requireNonNull(projectColumns);
@@ -73,8 +77,8 @@ class LocalCarbonStore extends MetaCachedCarbonStore {
CarbonInputFormat.setTableName(job.getConfiguration(), table.getTableName());
CarbonInputFormat.setDatabaseName(job.getConfiguration(), table.getDatabaseName());
CarbonInputFormat.setCarbonReadSupport(job.getConfiguration(), CarbonRowReadSupport.class);
- CarbonInputFormat.setColumnProjection(
- job.getConfiguration(), new CarbonProjection(projectColumns));
+ CarbonInputFormat
+ .setColumnProjection(job.getConfiguration(), new CarbonProjection(projectColumns));
if (filter != null) {
CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter);
}
@@ -84,6 +88,8 @@ class LocalCarbonStore extends MetaCachedCarbonStore {
List<RecordReader<Void, Object>> readers = new ArrayList<>(splits.size());
+ List<CarbonRow> rows = new ArrayList<>();
+
try {
for (InputSplit split : splits) {
TaskAttemptContextImpl attempt =
@@ -92,19 +98,27 @@ class LocalCarbonStore extends MetaCachedCarbonStore {
reader.initialize(split, attempt);
readers.add(reader);
}
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- List<CarbonRow> rows = new ArrayList<>();
- try {
for (RecordReader<Void, Object> reader : readers) {
while (reader.nextKeyValue()) {
- rows.add((CarbonRow)reader.getCurrentValue());
+ rows.add((CarbonRow) reader.getCurrentValue());
+ }
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOGGER.error(e);
}
}
} catch (InterruptedException e) {
throw new IOException(e);
+ } finally {
+ for (RecordReader<Void, Object> reader : readers) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOGGER.error(e);
+ }
+ }
}
return rows.iterator();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index 445b292..9727352 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -128,11 +128,12 @@ public class SearchRequestHandler {
// In search mode, reader will read multiple blocks by using a thread pool
CarbonRecordReader<CarbonRow> reader =
new CarbonRecordReader<>(queryModel, new CarbonRowReadSupport());
- reader.initialize(mbSplit, null);
// read all rows by the reader
List<CarbonRow> rows = new LinkedList<>();
try {
+ reader.initialize(mbSplit, null);
+
// loop to read required number of rows.
// By default, if user does not specify the limit value, limit is Long.MaxValue
while (reader.nextKeyValue() && rowCount < limit) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7ef91645/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
index cbf93b8..c4b501d 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -414,9 +414,6 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
private boolean isScanRequired(BlockletHeader header) {
// TODO require to implement min-max index
- if (null == filter) {
- return true;
- }
return true;
}