You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:18 UTC
[02/20] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index e0d4b73..70a8703 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -50,7 +50,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
* <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
* <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
*/
-public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> {
+public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonFactDataWriterImplV3.class.getName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
deleted file mode 100644
index b93fcb7..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordsLogger.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.surrogatekeysgenerator.csvbased;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-public class BadRecordsLogger {
-
- /**
- * Comment for <code>LOGGER</code>
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(BadRecordsLogger.class.getName());
- /**
- * Which holds the key and if any bad rec found to check from API to update
- * the status
- */
- private static Map<String, String> badRecordEntry =
- new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- /**
- * File Name
- */
- private String fileName;
- /**
- * Store path
- */
- private String storePath;
- /**
- * FileChannel
- */
- private BufferedWriter bufferedWriter;
- private DataOutputStream outStream;
- /**
- * csv file writer
- */
- private BufferedWriter bufferedCSVWriter;
- private DataOutputStream outCSVStream;
- /**
- * bad record log file path
- */
- private String logFilePath;
- /**
- * csv file path
- */
- private String csvFilePath;
-
- /**
- * task key which is DatabaseName/TableName/tablename
- */
- private String taskKey;
-
- private boolean badRecordsLogRedirect;
-
- private boolean badRecordLoggerEnable;
-
- private boolean badRecordConvertNullDisable;
-
- private boolean isDataLoadFail;
-
- // private final Object syncObject =new Object();
-
- public BadRecordsLogger(String key, String fileName, String storePath,
- boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
- boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
- // Initially no bad rec
- taskKey = key;
- this.fileName = fileName;
- this.storePath = storePath;
- this.badRecordsLogRedirect = badRecordsLogRedirect;
- this.badRecordLoggerEnable = badRecordLoggerEnable;
- this.badRecordConvertNullDisable = badRecordConvertNullDisable;
- this.isDataLoadFail = isDataLoadFail;
- }
-
- /**
- * @param key DatabaseNaame/TableName/tablename
- * @return return "Partially"
- */
- public static String hasBadRecord(String key) {
- return badRecordEntry.get(key);
- }
-
- /**
- * @param key DatabaseNaame/TableName/tablename
- * @return remove key from the map
- */
- public static String removeBadRecordKey(String key) {
- return badRecordEntry.remove(key);
- }
-
- public void addBadRecordsToBuilder(Object[] row, String reason)
- throws CarbonDataLoadingException {
- if (badRecordsLogRedirect || badRecordLoggerEnable) {
- StringBuilder logStrings = new StringBuilder();
- int size = row.length;
- int count = size;
- for (int i = 0; i < size; i++) {
- if (null == row[i]) {
- char ch =
- logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
- if (ch == ',') {
- logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
- }
- break;
- } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
- logStrings.append("null");
- } else {
- logStrings.append(row[i]);
- }
- if (count > 1) {
- logStrings.append(',');
- }
- count--;
- }
- if (badRecordsLogRedirect) {
- writeBadRecordsToCSVFile(logStrings);
- }
- if (badRecordLoggerEnable) {
- logStrings.append("----->");
- if (null != reason) {
- if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
- logStrings
- .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, "null"));
- } else {
- logStrings.append(reason);
- }
- }
- writeBadRecordsToFile(logStrings);
- }
- } else {
- // setting partial success entry since even if bad records are there then load
- // status should be partial success regardless of bad record logged
- badRecordEntry.put(taskKey, "Partially");
- }
- }
-
- /**
- *
- */
- private synchronized void writeBadRecordsToFile(StringBuilder logStrings)
- throws CarbonDataLoadingException {
- if (null == logFilePath) {
- logFilePath =
- this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
- + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
- }
- try {
- if (null == bufferedWriter) {
- FileType fileType = FileFactory.getFileType(storePath);
- if (!FileFactory.isFileExist(this.storePath, fileType)) {
- // create the folders if not exist
- FileFactory.mkdirs(this.storePath, fileType);
-
- // create the files
- FileFactory.createNewFile(logFilePath, fileType);
- }
-
- outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
-
- bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- }
- bufferedWriter.write(logStrings.toString());
- bufferedWriter.newLine();
- } catch (FileNotFoundException e) {
- LOGGER.error("Bad Log Files not found");
- throw new CarbonDataLoadingException("Bad Log Files not found", e);
- } catch (IOException e) {
- LOGGER.error("Error While writing bad record log File");
- throw new CarbonDataLoadingException("Error While writing bad record log File", e);
- } finally {
- // if the Bad record file is created means it partially success
- // if any entry present with key that means its have bad record for
- // that key
- badRecordEntry.put(taskKey, "Partially");
- }
- }
-
- /**
- * method will write the row having bad record in the csv file.
- *
- * @param logStrings
- */
- private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings)
- throws CarbonDataLoadingException {
- if (null == csvFilePath) {
- csvFilePath =
- this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
- + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
- }
- try {
- if (null == bufferedCSVWriter) {
- FileType fileType = FileFactory.getFileType(storePath);
- if (!FileFactory.isFileExist(this.storePath, fileType)) {
- // create the folders if not exist
- FileFactory.mkdirs(this.storePath, fileType);
-
- // create the files
- FileFactory.createNewFile(csvFilePath, fileType);
- }
-
- outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
-
- bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- }
- bufferedCSVWriter.write(logStrings.toString());
- bufferedCSVWriter.newLine();
- } catch (FileNotFoundException e) {
- LOGGER.error("Bad record csv Files not found");
- throw new CarbonDataLoadingException("Bad record csv Files not found", e);
- } catch (IOException e) {
- LOGGER.error("Error While writing bad record csv File");
- throw new CarbonDataLoadingException("Error While writing bad record csv File", e);
- }
- finally {
- badRecordEntry.put(taskKey, "Partially");
- }
- }
-
- public boolean isBadRecordConvertNullDisable() {
- return badRecordConvertNullDisable;
- }
-
- public boolean isDataLoadFail() {
- return isDataLoadFail;
- }
-
- public boolean isBadRecordLoggerEnable() {
- return badRecordLoggerEnable;
- }
-
- public boolean isBadRecordsLogRedirect() {
- return badRecordsLogRedirect;
- }
-
- /**
- * closeStreams void
- */
- public synchronized void closeStreams() {
- CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index fabb5a5..79e49ef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -52,10 +52,10 @@ import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
new file mode 100644
index 0000000..8681269
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.util;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.datastore.row.LoadStatusType;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.merger.NodeBlockRelation;
+import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+
+public final class CarbonLoaderUtil {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+
+ private CarbonLoaderUtil() {
+ }
+
+ public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) {
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+ for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + "");
+ deleteStorePath(segmentPath);
+ }
+ }
+
+ /**
+ * the method returns true if the segment has carbondata file else returns false.
+ *
+ * @param loadModel
+ * @param currentLoad
+ * @return
+ */
+ public static boolean isValidSegment(CarbonLoadModel loadModel,
+ int currentLoad) {
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema()
+ .getCarbonTable();
+ CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(
+ loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+ int fileCount = 0;
+ int partitionCount = carbonTable.getPartitionCount();
+ for (int i = 0; i < partitionCount; i++) {
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "",
+ currentLoad + "");
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath,
+ FileFactory.getFileType(segmentPath));
+ CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(
+ CarbonTablePath.getCarbonIndexExtension())
+ || file.getName().endsWith(
+ CarbonTablePath.getCarbonDataExtension());
+ }
+
+ });
+ fileCount += files.length;
+ if (files.length > 0) {
+ return true;
+ }
+ }
+ if (fileCount == 0) {
+ return false;
+ }
+ return true;
+ }
+ public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel,
+ final boolean isCompactionFlow) throws IOException {
+ CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ String metaDataLocation = carbonTable.getMetaDataFilepath();
+ final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier());
+
+ //delete folder which metadata no exist in tablestatus
+ for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
+ final String partitionCount = i + "";
+ String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+ FileType fileType = FileFactory.getFileType(partitionPath);
+ if (FileFactory.isFileExist(partitionPath, fileType)) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile path) {
+ String segmentId =
+ CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+ boolean found = false;
+ for (int j = 0; j < details.length; j++) {
+ if (details[j].getLoadName().equals(segmentId) && details[j].getPartitionCount()
+ .equals(partitionCount)) {
+ found = true;
+ break;
+ }
+ }
+ return !found;
+ }
+ });
+ for (int k = 0; k < listFiles.length; k++) {
+ String segmentId =
+ CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+ if (isCompactionFlow) {
+ if (segmentId.contains(".")) {
+ deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ } else {
+ if (!segmentId.contains(".")) {
+ deleteStorePath(listFiles[k].getAbsolutePath());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static void deleteStorePath(String path) {
+ try {
+ FileType fileType = FileFactory.getFileType(path);
+ if (FileFactory.isFileExist(path, fileType)) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(path, fileType);
+ CarbonUtil.deleteFoldersAndFiles(carbonFile);
+ }
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error("Unable to delete the given path :: " + e.getMessage());
+ }
+ }
+
+
+ /**
+ * This method will delete the local data load folder location after data load is complete
+ *
+ * @param loadModel
+ */
+ public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel,
+ boolean isCompactionFlow, boolean isAltPartitionFlow) {
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = CarbonDataProcessorUtil
+ .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+ loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
+ // form local store location
+ final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey);
+ if (localStoreLocations == null) {
+ throw new RuntimeException("Store location not set for the key " + tempLocationKey);
+ }
+ // submit local folder clean up in another thread so that main thread execution is not blocked
+ ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1);
+ try {
+ localFolderDeletionService.submit(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ long startTime = System.currentTimeMillis();
+ String[] locArray = StringUtils.split(localStoreLocations, File.pathSeparator);
+ for (String loc : locArray) {
+ try {
+ CarbonUtil.deleteFoldersAndFiles(new File(loc));
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error(e,
+ "Failed to delete local data load folder location: " + loc);
+ }
+ }
+ LOGGER.info("Deleted the local store location: " + localStoreLocations
+ + " : Time taken: " + (System.currentTimeMillis() - startTime));
+ return null;
+ }
+ });
+ } finally {
+ if (null != localFolderDeletionService) {
+ localFolderDeletionService.shutdown();
+ }
+ }
+
+ }
+
+ /**
+ * This API will write the load level metadata for the loadmanagement module inorder to
+ * manage the load and query execution management smoothly.
+ *
+ * @param newMetaEntry
+ * @param loadModel
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
+ CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
+ throws IOException, InterruptedException {
+ boolean status = false;
+ String metaDataFilepath =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
+ String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+ try {
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info(
+ "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
+ + " for table status updation");
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+ SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+ List<LoadMetadataDetails> listOfLoadFolderDetails =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<CarbonFile> staleFolders = new ArrayList<>();
+ Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
+ // create a new segment Id if load has just begun else add the already generated Id
+ if (loadStartEntry) {
+ String segmentId =
+ String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
+ newMetaEntry.setLoadName(segmentId);
+ loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
+ loadModel.setSegmentId(segmentId);
+ // Exception should be thrown if:
+ // 1. If insert overwrite is in progress and any other load or insert operation
+ // is triggered
+ // 2. If load or insert into operation is in progress and insert overwrite operation
+ // is triggered
+ for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+ if (entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
+ throw new RuntimeException("Already insert overwrite is in progress");
+ } else if (
+ newMetaEntry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())
+ && entry.getLoadStatus().equals(LoadStatusType.IN_PROGRESS.getMessage())) {
+ throw new RuntimeException("Already insert into or load is in progress");
+ }
+ }
+ listOfLoadFolderDetails.add(newMetaEntry);
+ } else {
+ newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
+ // existing entry needs to be overwritten as the entry will exist with some
+ // intermediate status
+ int indexToOverwriteNewMetaEntry = 0;
+ for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+ if (entry.getLoadName().equals(newMetaEntry.getLoadName())
+ && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+ break;
+ }
+ indexToOverwriteNewMetaEntry++;
+ }
+ if (listOfLoadFolderDetails.get(indexToOverwriteNewMetaEntry).getLoadStatus()
+ .equals(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+ throw new RuntimeException("It seems insert overwrite has been issued during load");
+ }
+ if (insertOverwrite) {
+ for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+ if (!entry.getLoadStatus().equals(LoadStatusType.INSERT_OVERWRITE.getMessage())) {
+ entry.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+ // For insert overwrite, we will delete the old segment folder immediately
+ // So collect the old segments here
+ String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
+ // add to the deletion list only if file exist else HDFS file system will throw
+ // exception while deleting the file if file path does not exist
+ if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+ staleFolders.add(FileFactory.getCarbonFile(path));
+ }
+ }
+ }
+ }
+ listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
+ .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+ // Delete all old stale segment folders
+ for (CarbonFile staleFolder : staleFolders) {
+ // try block is inside for loop because even if there is failure in deletion of 1 stale
+ // folder still remaining stale folders should be deleted
+ try {
+ CarbonUtil.deleteFoldersAndFiles(staleFolder);
+ } catch (IOException | InterruptedException e) {
+ LOGGER.error("Failed to delete stale folder: " + e.getMessage());
+ }
+ }
+ status = true;
+ } else {
+ LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
+ .getDatabaseName() + "." + loadModel.getTableName());
+ };
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info(
+ "Table unlocked successfully after table status updation" + loadModel.getDatabaseName()
+ + "." + loadModel.getTableName());
+ } else {
+ LOGGER.error(
+ "Unable to unlock Table lock for table" + loadModel.getDatabaseName() + "." + loadModel
+ .getTableName() + " during table status updation");
+ }
+ }
+ return status;
+ }
+
+ /**
+ * Method to create new entry for load in table status file
+ *
+ * @param loadMetadataDetails
+ * @param loadStatus
+ * @param loadStartTime
+ * @param addLoadEndTime
+ */
+ public static void populateNewLoadMetaEntry(LoadMetadataDetails loadMetadataDetails,
+ String loadStatus, long loadStartTime, boolean addLoadEndTime) {
+ if (addLoadEndTime) {
+ long loadEndDate = CarbonUpdateUtil.readCurrentTime();
+ loadMetadataDetails.setLoadEndTime(loadEndDate);
+ }
+ loadMetadataDetails.setLoadStatus(loadStatus);
+ loadMetadataDetails.setLoadStartTime(loadStartTime);
+ }
+
+ public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
+ List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(storeLocation, dbName, tableName);
+ String dataLoadLocation = carbonTablePath.getTableStatusFilePath();
+
+ DataOutputStream dataOutputStream;
+ Gson gsonObjectToWrite = new Gson();
+ BufferedWriter brWriter = null;
+
+ AtomicFileOperations writeOperation =
+ new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+ try {
+
+ dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+ brWriter.write(metadataInstance);
+ } finally {
+ try {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ } catch (Exception e) {
+ LOGGER.error("error in flushing ");
+
+ }
+ CarbonUtil.closeStreams(brWriter);
+ writeOperation.close();
+ }
+
+ }
+
+ public static String readCurrentTime() {
+ SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+ String date = null;
+
+ date = sdf.format(new Date());
+
+ return date;
+ }
+
+ public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
+ String carbonStorePath) throws IOException {
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
+ CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, carbonStorePath);
+ return dictCache.get(columnIdentifier);
+ }
+
+ public static Dictionary getDictionary(CarbonTableIdentifier tableIdentifier,
+ ColumnIdentifier columnIdentifier, String carbonStorePath, DataType dataType)
+ throws IOException {
+ return getDictionary(
+ new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier, dataType,
+ CarbonStorePath.getCarbonTablePath(carbonStorePath, tableIdentifier)),
+ carbonStorePath);
+ }
+
+ /**
+ * This method will divide the blocks among the tasks of the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @param noOfNodesInput -1 if number of nodes has to be decided
+ * based on block location information
+ * @param parallelism total no of tasks to execute in parallel
+ * @return
+ */
+ public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
+ List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
+ List<String> activeNode) {
+
+ Map<String, List<Distributable>> mapOfNodes =
+ CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+ int taskPerNode = parallelism / mapOfNodes.size();
+ //assigning non zero value to noOfTasksPerNode
+ int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
+ // divide the blocks of a node among the tasks of the node.
+ return assignBlocksToTasksPerNode(mapOfNodes, noOfTasksPerNode);
+ }
+
+ /**
+ * This method will divide the blocks among the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @return
+ */
+ public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+ int noOfNodesInput) {
+ return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+ }
+
+ /**
+ * This method will divide the blocks among the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @return
+ */
+ public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
+ // -1 if number of nodes has to be decided based on block location information
+ return nodeBlockMapping(blockInfos, -1);
+ }
+
+ /**
+ * the method returns the number of required executors
+ *
+ * @param blockInfos
+ * @return
+ */
+ public static Map<String, List<Distributable>> getRequiredExecutors(
+ List<Distributable> blockInfos) {
+ List<NodeBlockRelation> flattenedList =
+ new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (Distributable blockInfo : blockInfos) {
+ try {
+ for (String eachNode : blockInfo.getLocations()) {
+ NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+ flattenedList.add(nbr);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+ }
+ }
+ // sort the flattened data.
+ Collections.sort(flattenedList);
+ Map<String, List<Distributable>> nodeAndBlockMapping =
+ new LinkedHashMap<String, List<Distributable>>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ // from the flattened list create a mapping of node vs Data blocks.
+ createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+ return nodeAndBlockMapping;
+ }
+
+ /**
+ * This method will divide the blocks among the nodes as per the data locality
+ *
+ * @param blockInfos
+ * @param noOfNodesInput -1 if number of nodes has to be decided
+ * based on block location information
+ * @return
+ */
+ public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos,
+ int noOfNodesInput, List<String> activeNodes) {
+
+ Map<String, List<Distributable>> nodeBlocksMap =
+ new HashMap<String, List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ List<NodeBlockRelation> flattenedList =
+ new ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ Set<Distributable> uniqueBlocks =
+ new HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ Set<String> nodes = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+
+ int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
+ if (null != activeNodes) {
+ noofNodes = activeNodes.size();
+ }
+ int blocksPerNode = blockInfos.size() / noofNodes;
+ blocksPerNode = blocksPerNode <= 0 ? 1 : blocksPerNode;
+
+ // sort the flattened data.
+ Collections.sort(flattenedList);
+
+ Map<String, List<Distributable>> nodeAndBlockMapping =
+ new LinkedHashMap<String, List<Distributable>>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ // from the flattened list create a mapping of node vs Data blocks.
+ createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+
+ // so now we have a map of node vs blocks. allocate the block as per the order
+ createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
+
+ // if any blocks remain then assign them to nodes in round robin.
+ assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
+
+ return nodeBlocksMap;
+ }
+
+ /**
+ * Assigning the blocks of a node to tasks.
+ *
+ * @param nodeBlocksMap nodeName to list of blocks mapping
+ * @param noOfTasksPerNode
+ * @return
+ */
+ private static Map<String, List<List<Distributable>>> assignBlocksToTasksPerNode(
+ Map<String, List<Distributable>> nodeBlocksMap, int noOfTasksPerNode) {
+ Map<String, List<List<Distributable>>> outputMap =
+ new HashMap<String, List<List<Distributable>>>(
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ // for each node
+ for (Map.Entry<String, List<Distributable>> eachNode : nodeBlocksMap.entrySet()) {
+
+ List<Distributable> blockOfEachNode = eachNode.getValue();
+ //sorting the block so same block will be give to same executor
+ Collections.sort(blockOfEachNode);
+ // create the task list for each node.
+ createTaskListForNode(outputMap, noOfTasksPerNode, eachNode.getKey());
+
+ // take all the block of node and divide it among the tasks of a node.
+ divideBlockToTasks(outputMap, eachNode.getKey(), blockOfEachNode);
+ }
+
+ return outputMap;
+ }
+
+ /**
+ * This will divide the blocks of a node to tasks of the node.
+ *
+ * @param outputMap
+ * @param key
+ * @param blockOfEachNode
+ */
+ private static void divideBlockToTasks(Map<String, List<List<Distributable>>> outputMap,
+ String key, List<Distributable> blockOfEachNode) {
+
+ List<List<Distributable>> taskLists = outputMap.get(key);
+ int tasksOfNode = taskLists.size();
+ int i = 0;
+ for (Distributable block : blockOfEachNode) {
+
+ taskLists.get(i % tasksOfNode).add(block);
+ i++;
+ }
+
+ }
+
+ /**
+ * This will create the empty list for each task of a node.
+ *
+ * @param outputMap
+ * @param noOfTasksPerNode
+ * @param key
+ */
+ private static void createTaskListForNode(Map<String, List<List<Distributable>>> outputMap,
+ int noOfTasksPerNode, String key) {
+ List<List<Distributable>> nodeTaskList =
+ new ArrayList<List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ for (int i = 0; i < noOfTasksPerNode; i++) {
+ List<Distributable> eachTask =
+ new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ nodeTaskList.add(eachTask);
+
+ }
+ outputMap.put(key, nodeTaskList);
+
+ }
+
+ /**
+ * If any left over data blocks are present then assign those to nodes in round robin way.
+ *
+ * @param outputMap
+ * @param uniqueBlocks
+ */
+ private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
+ Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+
+ if (activeNodes != null) {
+ for (String activeNode : activeNodes) {
+ List<Distributable> blockLst = outputMap.get(activeNode);
+ if (null == blockLst) {
+ blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ }
+ populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+ if (blockLst.size() > 0) {
+ outputMap.put(activeNode, blockLst);
+ }
+ }
+ } else {
+ for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+ List<Distributable> blockLst = entry.getValue();
+ populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+ }
+
+ }
+
+ for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+ Iterator<Distributable> blocks = uniqueBlocks.iterator();
+ if (blocks.hasNext()) {
+ Distributable block = blocks.next();
+ List<Distributable> blockLst = entry.getValue();
+ blockLst.add(block);
+ blocks.remove();
+ }
+ }
+ }
+
+ /**
+ * The method populate the blockLst to be allocate to a specific node.
+ * @param uniqueBlocks
+ * @param noOfBlocksPerNode
+ * @param blockLst
+ */
+ private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+ List<Distributable> blockLst) {
+ Iterator<Distributable> blocks = uniqueBlocks.iterator();
+ //if the node is already having the per block nodes then avoid assign the extra blocks
+ if (blockLst.size() == noOfBlocksPerNode) {
+ return;
+ }
+ while (blocks.hasNext()) {
+ Distributable block = blocks.next();
+ blockLst.add(block);
+ blocks.remove();
+ if (blockLst.size() >= noOfBlocksPerNode) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * To create the final output of the Node and Data blocks
+ *
+ * @param outputMap
+ * @param blocksPerNode
+ * @param uniqueBlocks
+ * @param nodeAndBlockMapping
+ * @param activeNodes
+ */
+ private static void createOutputMap(Map<String, List<Distributable>> outputMap, int blocksPerNode,
+ Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> nodeAndBlockMapping,
+ List<String> activeNodes) {
+
+ ArrayList<NodeMultiBlockRelation> multiBlockRelations =
+ new ArrayList<>(nodeAndBlockMapping.size());
+ for (Map.Entry<String, List<Distributable>> entry : nodeAndBlockMapping.entrySet()) {
+ multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), entry.getValue()));
+ }
+ // sort nodes based on number of blocks per node, so that nodes having lesser blocks
+ // are assigned first
+ Collections.sort(multiBlockRelations);
+
+ for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+ String nodeName = nodeMultiBlockRelation.getNode();
+ //assign the block to the node only if the node is active
+ String activeExecutor = nodeName;
+ if (null != activeNodes) {
+ activeExecutor = getActiveExecutor(activeNodes, nodeName);
+ if (null == activeExecutor) {
+ continue;
+ }
+ }
+ // this loop will be for each NODE
+ int nodeCapacity = 0;
+ // loop thru blocks of each Node
+ for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+
+ // check if this is already assigned.
+ if (uniqueBlocks.contains(block)) {
+
+ if (null == outputMap.get(activeExecutor)) {
+ List<Distributable> list =
+ new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ outputMap.put(activeExecutor, list);
+ }
+ // assign this block to this node if node has capacity left
+ if (nodeCapacity < blocksPerNode) {
+ List<Distributable> infos = outputMap.get(activeExecutor);
+ infos.add(block);
+ nodeCapacity++;
+ uniqueBlocks.remove(block);
+ } else {
+ // No need to continue loop as node is full
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * method validates whether the node is active or not.
+ *
+ * @param activeNode
+ * @param nodeName
+ * @return returns true if active else false.
+ */
+ private static String getActiveExecutor(List activeNode, String nodeName) {
+ boolean isActiveNode = activeNode.contains(nodeName);
+ if (isActiveNode) {
+ return nodeName;
+ }
+ //if localhost then retrieve the localhost name then do the check
+ else if (nodeName.equals("localhost")) {
+ try {
+ String hostName = InetAddress.getLocalHost().getHostName();
+ isActiveNode = activeNode.contains(hostName);
+ if (isActiveNode) {
+ return hostName;
+ }
+ } catch (UnknownHostException ue) {
+ isActiveNode = false;
+ }
+ } else {
+ try {
+ String hostAddress = InetAddress.getByName(nodeName).getHostAddress();
+ isActiveNode = activeNode.contains(hostAddress);
+ if (isActiveNode) {
+ return hostAddress;
+ }
+ } catch (UnknownHostException ue) {
+ isActiveNode = false;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Create the Node and its related blocks Mapping and put in a Map
+ *
+ * @param flattenedList
+ * @param nodeAndBlockMapping
+ */
+ private static void createNodeVsBlockMapping(List<NodeBlockRelation> flattenedList,
+ Map<String, List<Distributable>> nodeAndBlockMapping) {
+ for (NodeBlockRelation nbr : flattenedList) {
+ String node = nbr.getNode();
+ List<Distributable> list;
+
+ if (null == nodeAndBlockMapping.get(node)) {
+ list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ list.add(nbr.getBlock());
+ nodeAndBlockMapping.put(node, list);
+ } else {
+ list = nodeAndBlockMapping.get(node);
+ list.add(nbr.getBlock());
+ }
+ }
+ /*for resolving performance issue, removed values() with entrySet () iterating the values and
+ sorting it.entrySet will give the logical view for hashMap and we dont query the map twice for
+ each key whereas values () iterate twice*/
+ Iterator<Map.Entry<String, List<Distributable>>> iterator =
+ nodeAndBlockMapping.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Collections.sort(iterator.next().getValue());
+ }
+ }
+
+ /**
+ * Create the flat List i.e flattening of the Map.
+ *
+ * @param blockInfos
+ * @param flattenedList
+ * @param uniqueBlocks
+ */
+ private static void createFlattenedListFromMap(List<Distributable> blockInfos,
+ List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
+ Set<String> nodeList) {
+ for (Distributable blockInfo : blockInfos) {
+ // put the blocks in the set
+ uniqueBlocks.add(blockInfo);
+
+ try {
+ for (String eachNode : blockInfo.getLocations()) {
+ NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
+ flattenedList.add(nbr);
+ nodeList.add(eachNode);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e);
+ }
+ }
+ }
+
+ /**
+ * This method will get the store location for the given path, segment id and partition id
+ *
+ * @param carbonStorePath
+ * @param segmentId
+ */
+ public static void checkAndCreateCarbonDataLocation(String carbonStorePath,
+ String segmentId, CarbonTable carbonTable) {
+ CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+ CarbonTablePath carbonTablePath =
+ CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+ String carbonDataDirectoryPath =
+ carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+ }
+
+ /**
+ * This will update the old table status details before clean files to the latest table status.
+ * @param oldList
+ * @param newList
+ * @return
+ */
+ public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+ LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
+
+ List<LoadMetadataDetails> newListMetadata =
+ new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
+ for (LoadMetadataDetails oldSegment : oldList) {
+ if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
+ newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
+ }
+ }
+ return newListMetadata;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
new file mode 100644
index 0000000..ec91472
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.processing.partition.Partition;
+import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer;
+import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl;
+import org.apache.carbondata.processing.partition.impl.QueryPartitionHelper;
+import org.apache.carbondata.processing.splits.TableSplit;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This utilty parses the Carbon query plan to actual query model object.
+ */
+public class CarbonQueryUtil {
+
+ private CarbonQueryUtil() {
+
+ }
+
+ /**
+ * It creates the one split for each region server.
+ */
+ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
+ CarbonQueryPlan queryPlan) {
+
+ //Just create splits depends on locations of region servers
+ List<Partition> allPartitions = null;
+ if (queryPlan == null) {
+ allPartitions =
+ QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
+ } else {
+ allPartitions =
+ QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+ }
+ TableSplit[] splits = new TableSplit[allPartitions.size()];
+ for (int i = 0; i < splits.length; i++) {
+ splits[i] = new TableSplit();
+ List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ Partition partition = allPartitions.get(i);
+ String location = QueryPartitionHelper.getInstance()
+ .getLocation(partition, databaseName, tableName);
+ locations.add(location);
+ splits[i].setPartition(partition);
+ splits[i].setLocations(locations);
+ }
+
+ return splits;
+ }
+
+ /**
+ * It creates the one split for each region server.
+ */
+ public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) {
+
+ //Just create splits depends on locations of region servers
+ DefaultLoadBalancer loadBalancer = null;
+ List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+ loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
+ TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
+ for (int i = 0; i < tblSplits.length; i++) {
+ tblSplits[i] = new TableSplit();
+ List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ Partition partition = allPartitions.get(i);
+ String location = loadBalancer.getNodeForPartitions(partition);
+ locations.add(location);
+ tblSplits[i].setPartition(partition);
+ tblSplits[i].setLocations(locations);
+ }
+ return tblSplits;
+ }
+
+ /**
+ * split sourcePath by comma
+ */
+ public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
+ String separator) {
+ if (StringUtils.isNotEmpty(sourcePath)) {
+ String[] files = sourcePath.split(separator);
+ Collections.addAll(partitionsFiles, files);
+ }
+ }
+
+ private static List<Partition> getAllFilesForDataLoad(String sourcePath) {
+ List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
+ List<Partition> partitionList =
+ new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
+
+ partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+ partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
+
+ for (int i = 0; i < files.size(); i++) {
+ partitionFiles.get(0).add(files.get(i));
+ }
+ return partitionList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
new file mode 100644
index 0000000..d668cc2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+public final class DeleteLoadFolders {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
+
+ private DeleteLoadFolders() {
+
+ }
+
+ /**
+ * returns segment path
+ *
+ * @param dbName
+ * @param tableName
+ * @param storeLocation
+ * @param partitionId
+ * @param oneLoad
+ * @return
+ */
+ private static String getSegmentPath(String dbName, String tableName, String storeLocation,
+ int partitionId, LoadMetadataDetails oneLoad) {
+ CarbonTablePath carbon = new CarbonStorePath(storeLocation).getCarbonTablePath(
+ new CarbonTableIdentifier(dbName, tableName, ""));
+ String segmentId = oneLoad.getLoadName();
+ return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId);
+ }
+
+ private static boolean physicalFactAndMeasureMetadataDeletion(String path) {
+
+ boolean status = false;
+ try {
+ if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+ CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return (CarbonTablePath.isCarbonDataFile(file.getName())
+ || CarbonTablePath.isCarbonIndexFile(file.getName()));
+ }
+ });
+
+ //if there are no fact and msr metadata files present then no need to keep
+ //entry in metadata.
+ if (filesToBeDeleted.length == 0) {
+ status = true;
+ } else {
+
+ for (CarbonFile eachFile : filesToBeDeleted) {
+ if (!eachFile.delete()) {
+ LOGGER.warn("Unable to delete the file as per delete command "
+ + eachFile.getAbsolutePath());
+ status = false;
+ } else {
+ status = true;
+ }
+ }
+ }
+ // need to delete the complete folder.
+ if (status) {
+ if (!file.delete()) {
+ LOGGER.warn("Unable to delete the folder as per delete command "
+ + file.getAbsolutePath());
+ status = false;
+ }
+ }
+
+ } else {
+ status = false;
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Unable to delete the file as per delete command " + path);
+ }
+
+ return status;
+
+ }
+
+ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
+ boolean isForceDelete) {
+ if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
+ || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
+ && oneLoad.getVisibility().equalsIgnoreCase("true")) {
+ if (isForceDelete) {
+ return true;
+ }
+ long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+ return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+ }
+
+ return false;
+ }
+
+ public static boolean deleteLoadFoldersFromFileSystem(String dbName, String tableName,
+ String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
+
+ boolean isDeleted = false;
+
+ if (details != null && details.length != 0) {
+ for (LoadMetadataDetails oneLoad : details) {
+ if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+ String path = getSegmentPath(dbName, tableName, storeLocation, 0, oneLoad);
+ boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
+ if (deletionStatus) {
+ isDeleted = true;
+ oneLoad.setVisibility("false");
+ LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+ }
+ }
+ }
+ }
+
+ return isDeleted;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java
new file mode 100644
index 0000000..415eb8d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/LoadMetadataUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+
+/**
+ * Utility for load data
+ */
+public final class LoadMetadataUtil {
+ private LoadMetadataUtil() {
+
+ }
+
+ public static boolean isLoadDeletionRequired(String metaDataLocation) {
+ LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+ if (details != null && details.length != 0) {
+ for (LoadMetadataDetails oneRow : details) {
+ if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
+ || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
+ && oneRow.getVisibility().equalsIgnoreCase("true")) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java b/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java
new file mode 100644
index 0000000..fa910e6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/TableOptionConstant.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.util;
+
+/**
+ * enum holds the value related to the ddl option
+ */
+public enum TableOptionConstant {
+ SERIALIZATION_NULL_FORMAT("serialization_null_format"),
+ BAD_RECORDS_LOGGER_ENABLE("bad_records_logger_enable"),
+ BAD_RECORDS_ACTION("bad_records_action");
+
+ private String name;
+
+ /**
+ * constructor to initialize the enum value
+ * @param name
+ */
+ TableOptionConstant(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 3bb186e..61771ea 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -72,16 +72,15 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.constants.TableOptionConstant;
-import org.apache.carbondata.processing.csvload.BlockDetails;
-import org.apache.carbondata.processing.csvload.CSVInputFormat;
-import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator;
-import org.apache.carbondata.processing.csvload.StringArrayWritable;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.DataLoadExecutor;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
@@ -384,7 +383,6 @@ public class StoreCreator {
path.delete();
}
- SchemaInfo info = new SchemaInfo();
BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
Configuration configuration = new Configuration();
@@ -411,9 +409,6 @@ public class StoreCreator {
storeLocationArray,
new CarbonIterator[]{readerIterator});
- info.setDatabaseName(databaseName);
- info.setTableName(tableName);
-
writeLoadMetadata(loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
new ArrayList<LoadMetadataDetails>());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
deleted file mode 100644
index 676838d..0000000
--- a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class CSVInputFormatTest extends TestCase {
-
- /**
- * generate compressed files, no need to call this method.
- * @throws Exception
- */
- public void generateCompressFiles() throws Exception {
- String pwd = new File("src/test/resources/csv").getCanonicalPath();
- String inputFile = pwd + "/data.csv";
- FileInputStream input = new FileInputStream(inputFile);
- Configuration conf = new Configuration();
-
- // .gz
- String outputFile = pwd + "/data.csv.gz";
- FileOutputStream output = new FileOutputStream(outputFile);
- GzipCodec gzip = new GzipCodec();
- gzip.setConf(conf);
- CompressionOutputStream outputStream = gzip.createOutputStream(output);
- int i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- // .bz2
- input = new FileInputStream(inputFile);
- outputFile = pwd + "/data.csv.bz2";
- output = new FileOutputStream(outputFile);
- BZip2Codec bzip2 = new BZip2Codec();
- bzip2.setConf(conf);
- outputStream = bzip2.createOutputStream(output);
- i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- // .snappy
- input = new FileInputStream(inputFile);
- outputFile = pwd + "/data.csv.snappy";
- output = new FileOutputStream(outputFile);
- SnappyCodec snappy = new SnappyCodec();
- snappy.setConf(conf);
- outputStream = snappy.createOutputStream(output);
- i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- //.lz4
- input = new FileInputStream(inputFile);
- outputFile = pwd + "/data.csv.lz4";
- output = new FileOutputStream(outputFile);
- Lz4Codec lz4 = new Lz4Codec();
- lz4.setConf(conf);
- outputStream = lz4.createOutputStream(output);
- i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- }
-
- /**
- * CSVCheckMapper check the content of csv files.
- */
- public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
- NullWritable> {
- @Override
- protected void map(NullWritable key, StringArrayWritable value, Context context)
- throws IOException, InterruptedException {
- String[] columns = value.get();
- int id = Integer.parseInt(columns[0]);
- int salary = Integer.parseInt(columns[6]);
- Assert.assertEquals(id - 1, salary - 15000);
- }
- }
-
- /**
- * test read csv files
- * @throws Exception
- */
- @Test public void testReadCSVFiles() throws Exception{
- Configuration conf = new Configuration();
- prepareConf(conf);
- Job job = Job.getInstance(conf, "CSVInputFormat_normal");
- job.setJarByClass(CSVInputFormatTest.class);
- job.setMapperClass(CSVCheckMapper.class);
- job.setNumReduceTasks(0);
- job.setInputFormatClass(CSVInputFormat.class);
-
- String inputFolder = new File("src/test/resources/csv").getCanonicalPath();
- FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
- FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
- FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
- // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
- // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
-
- File output = new File("target/output_CSVInputFormatTest");
- deleteOutput(output);
- FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
-
- Assert.assertTrue(job.waitForCompletion(true));
- }
-
- private void prepareConf(Configuration conf) {
- conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
- conf.set(CSVInputFormat.MAX_COLUMNS, "10");
- conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7");
- }
-
- private void deleteOutput(File output) {
- if (output.exists()) {
- if (output.isDirectory()) {
- for(File file : output.listFiles()) {
- deleteOutput(file);
- }
- output.delete();
- } else {
- output.delete();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java
new file mode 100644
index 0000000..925701d
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CSVInputFormatTest extends TestCase {
+
+ /**
+ * generate compressed files, no need to call this method.
+ * @throws Exception
+ */
+ public void generateCompressFiles() throws Exception {
+ String pwd = new File("src/test/resources/csv").getCanonicalPath();
+ String inputFile = pwd + "/data.csv";
+ FileInputStream input = new FileInputStream(inputFile);
+ Configuration conf = new Configuration();
+
+ // .gz
+ String outputFile = pwd + "/data.csv.gz";
+ FileOutputStream output = new FileOutputStream(outputFile);
+ GzipCodec gzip = new GzipCodec();
+ gzip.setConf(conf);
+ CompressionOutputStream outputStream = gzip.createOutputStream(output);
+ int i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ // .bz2
+ input = new FileInputStream(inputFile);
+ outputFile = pwd + "/data.csv.bz2";
+ output = new FileOutputStream(outputFile);
+ BZip2Codec bzip2 = new BZip2Codec();
+ bzip2.setConf(conf);
+ outputStream = bzip2.createOutputStream(output);
+ i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ // .snappy
+ input = new FileInputStream(inputFile);
+ outputFile = pwd + "/data.csv.snappy";
+ output = new FileOutputStream(outputFile);
+ SnappyCodec snappy = new SnappyCodec();
+ snappy.setConf(conf);
+ outputStream = snappy.createOutputStream(output);
+ i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ //.lz4
+ input = new FileInputStream(inputFile);
+ outputFile = pwd + "/data.csv.lz4";
+ output = new FileOutputStream(outputFile);
+ Lz4Codec lz4 = new Lz4Codec();
+ lz4.setConf(conf);
+ outputStream = lz4.createOutputStream(output);
+ i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ }
+
+ /**
+ * CSVCheckMapper check the content of csv files.
+ */
+ public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
+ NullWritable> {
+ @Override
+ protected void map(NullWritable key, StringArrayWritable value, Context context)
+ throws IOException, InterruptedException {
+ String[] columns = value.get();
+ int id = Integer.parseInt(columns[0]);
+ int salary = Integer.parseInt(columns[6]);
+ Assert.assertEquals(id - 1, salary - 15000);
+ }
+ }
+
+ /**
+ * test read csv files
+ * @throws Exception
+ */
+ @Test public void testReadCSVFiles() throws Exception{
+ Configuration conf = new Configuration();
+ prepareConf(conf);
+ Job job = Job.getInstance(conf, "CSVInputFormat_normal");
+ job.setJarByClass(CSVInputFormatTest.class);
+ job.setMapperClass(CSVCheckMapper.class);
+ job.setNumReduceTasks(0);
+ job.setInputFormatClass(CSVInputFormat.class);
+
+ String inputFolder = new File("src/test/resources/csv").getCanonicalPath();
+ FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
+ FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
+ FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
+ // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
+ // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
+
+ File output = new File("target/output_CSVInputFormatTest");
+ deleteOutput(output);
+ FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ }
+
+ private void prepareConf(Configuration conf) {
+ conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
+ conf.set(CSVInputFormat.MAX_COLUMNS, "10");
+ conf.set(CSVInputFormat.NUMBER_OF_COLUMNS, "7");
+ }
+
+ private void deleteOutput(File output) {
+ if (output.exists()) {
+ if (output.isDirectory()) {
+ for(File file : output.listFiles()) {
+ deleteOutput(file);
+ }
+ output.delete();
+ } else {
+ output.delete();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java b/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
new file mode 100644
index 0000000..88ff377
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionaryTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.dictionary;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.devapi.DictionaryGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InMemBiDictionaryTest {
+
+ /**
+ * test pre-created dictionary
+ */
+ @Test public void testPreCreated() throws Exception {
+ Map<Integer, String> map = new HashMap<>();
+ map.put(1, "amy");
+ map.put(2, "bob");
+ BiDictionary<Integer, String> dict = new InMemBiDictionary<>(map);
+ Assert.assertEquals(1, dict.getKey("amy").intValue());
+ Assert.assertEquals(2, dict.getKey("bob").intValue());
+ Assert.assertEquals("amy", dict.getValue(1));
+ Assert.assertEquals("bob", dict.getValue(2));
+ Assert.assertEquals(2, dict.size());
+ try {
+ dict.getOrGenerateKey("cat");
+ Assert.fail("add dictionary successfully");
+ } catch (Exception e) {
+ // test pass
+ }
+ }
+
+ /**
+ * test generating dictionary on the fly
+ */
+ @Test public void testGenerateDict() throws Exception {
+ BiDictionary<Integer, String> dict = new InMemBiDictionary<>(
+ new DictionaryGenerator<Integer, String>() {
+ int sequence = 1;
+ @Override
+ public Integer generateKey(String value) throws DictionaryGenerationException {
+ return sequence++;
+ }
+ });
+ Assert.assertEquals(1, dict.getOrGenerateKey("amy").intValue());
+ Assert.assertEquals(2, dict.getOrGenerateKey("bob").intValue());
+ Assert.assertEquals(1, dict.getKey("amy").intValue());
+ Assert.assertEquals(2, dict.getKey("bob").intValue());
+ Assert.assertEquals("amy", dict.getValue(1));
+ Assert.assertEquals("bob", dict.getValue(2));
+ Assert.assertEquals(2, dict.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java b/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
deleted file mode 100644
index 6d82cce..0000000
--- a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class InMemBiDictionaryTest {
-
- /**
- * test pre-created dictionary
- */
- @Test public void testPreCreated() throws Exception {
- Map<Integer, String> map = new HashMap<>();
- map.put(1, "amy");
- map.put(2, "bob");
- BiDictionary<Integer, String> dict = new InMemBiDictionary<>(map);
- Assert.assertEquals(1, dict.getKey("amy").intValue());
- Assert.assertEquals(2, dict.getKey("bob").intValue());
- Assert.assertEquals("amy", dict.getValue(1));
- Assert.assertEquals("bob", dict.getValue(2));
- Assert.assertEquals(2, dict.size());
- try {
- dict.getOrGenerateKey("cat");
- Assert.fail("add dictionary successfully");
- } catch (Exception e) {
- // test pass
- }
- }
-
- /**
- * test generating dictionary on the fly
- */
- @Test public void testGenerateDict() throws Exception {
- BiDictionary<Integer, String> dict = new InMemBiDictionary<>(
- new DictionaryGenerator<Integer, String>() {
- int sequence = 1;
- @Override
- public Integer generateKey(String value) throws DictionaryGenerationException {
- return sequence++;
- }
- });
- Assert.assertEquals(1, dict.getOrGenerateKey("amy").intValue());
- Assert.assertEquals(2, dict.getOrGenerateKey("bob").intValue());
- Assert.assertEquals(1, dict.getKey("amy").intValue());
- Assert.assertEquals(2, dict.getKey("bob").intValue());
- Assert.assertEquals("amy", dict.getValue(1));
- Assert.assertEquals("bob", dict.getValue(2));
- Assert.assertEquals(2, dict.size());
- }
-}