You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:06:27 UTC
[1/5] carbondata git commit: [CARBONDATA-1289] remove unused method
Repository: carbondata
Updated Branches:
refs/heads/encoding_override 403c3d9b4 -> 9e4da2a6c
[CARBONDATA-1289] remove unused method
This closes #1157
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b31f09b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b31f09b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b31f09b
Branch: refs/heads/encoding_override
Commit: 8b31f09b638cb5b8fbbdfa56f29f8c97e68e6aa6
Parents: 403c3d9
Author: czg516516 <cz...@163.com>
Authored: Tue Jul 11 11:01:47 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Jul 11 12:56:18 2017 +0800
----------------------------------------------------------------------
.../newflow/exception/CarbonDataLoadingException.java | 13 -------------
1 file changed, 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b31f09b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
index b9593e7..6ffdd03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
@@ -17,8 +17,6 @@
package org.apache.carbondata.processing.newflow.exception;
-import java.util.Locale;
-
public class CarbonDataLoadingException extends RuntimeException {
/**
* default serial version ID.
@@ -60,17 +58,6 @@ public class CarbonDataLoadingException extends RuntimeException {
}
/**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
* getLocalizedMessage
*/
@Override public String getLocalizedMessage() {
[2/5] carbondata git commit: [CARBONDATA-1277] Dictionary generation
failure due to hdfs lease expiry
Posted by ja...@apache.org.
[CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry
This closes #1147
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/285ce72d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/285ce72d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/285ce72d
Branch: refs/heads/encoding_override
Commit: 285ce72d4c9b3364bbdc454f4b6b331b3caa42db
Parents: 8b31f09
Author: manishgupta88 <to...@gmail.com>
Authored: Sat Jul 8 15:46:25 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jul 11 18:00:18 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 7 +
.../core/datastore/impl/FileFactory.java | 21 ++
.../AtomicFileOperationsImpl.java | 5 +-
.../apache/carbondata/core/util/CarbonUtil.java | 2 +-
.../core/util/path/HDFSLeaseUtils.java | 215 +++++++++++++++++++
.../core/writer/CarbonDictionaryWriterImpl.java | 20 +-
.../carbondata/core/writer/ThriftWriter.java | 2 +-
7 files changed, 266 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 208bab8..8110abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1287,6 +1287,13 @@ public final class CarbonCommonConstants {
public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
+ @CarbonProperty
+ public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
+ "carbon.lease.recovery.retry.count";
+ @CarbonProperty
+ public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL =
+ "carbon.lease.recovery.retry.interval";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 7acd6b1..2a35ab3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -518,4 +518,25 @@ public final class FileFactory {
}
}
+ /**
+ * This method will create the path object for a given file
+ *
+ * @param filePath
+ * @return
+ */
+ public static Path getPath(String filePath) {
+ return new Path(filePath);
+ }
+
+ /**
+ * This method will return the filesystem instance
+ *
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ public static FileSystem getFileSystem(Path path) throws IOException {
+ return path.getFileSystem(configuration);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
index befc76e..61690ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
public class AtomicFileOperationsImpl implements AtomicFileOperations {
@@ -67,10 +68,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
@Override public void close() throws IOException {
if (null != dataOutStream) {
- dataOutStream.close();
-
+ CarbonUtil.closeStream(dataOutStream);
CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-
if (!tempFile.renameForce(filePath)) {
throw new IOException("temporary file renaming failed, src="
+ tempFile.getPath() + ", dest=" + filePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8298600..06b2a61 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -128,7 +128,7 @@ public final class CarbonUtil {
try {
closeStream(stream);
} catch (IOException e) {
- LOGGER.error("Error while closing stream:" + e);
+ LOGGER.error(e, "Error while closing stream:" + e);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
new file mode 100644
index 0000000..c72c322
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+
+/**
+ * Implementation for HDFS utility methods
+ */
+public class HDFSLeaseUtils {
+
+ private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN = 1;
+ private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX = 50;
+ private static final String CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT = "5";
+ private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN = 1000;
+ private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX = 10000;
+ private static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT = "1000";
+
+ /**
+ * LOGGER
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(HDFSLeaseUtils.class.getName());
+
+ /**
+ * This method will validate whether the exception thrown if for lease recovery from HDFS
+ *
+ * @param message
+ * @return
+ */
+ public static boolean checkExceptionMessageForLeaseRecovery(String message) {
+ // depending on the scenario few more cases can be added for validating lease recovery exception
+ if (null != message && message.contains("Failed to APPEND_FILE")) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This method will make attempts to recover lease on a file using the
+ * distributed file system utility.
+ *
+ * @param filePath
+ * @return
+ * @throws IOException
+ */
+ public static boolean recoverFileLease(String filePath) throws IOException {
+ LOGGER.info("Trying to recover lease on file: " + filePath);
+ FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+ switch (fileType) {
+ case ALLUXIO:
+ case HDFS:
+ Path path = FileFactory.getPath(filePath);
+ FileSystem fs = FileFactory.getFileSystem(path);
+ return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);
+ case VIEWFS:
+ path = FileFactory.getPath(filePath);
+ fs = FileFactory.getFileSystem(path);
+ ViewFileSystem viewFileSystem = (ViewFileSystem) fs;
+ Path targetFileSystemPath = viewFileSystem.resolvePath(path);
+ FileSystem targetFileSystem = FileFactory.getFileSystem(targetFileSystemPath);
+ if (targetFileSystem instanceof DistributedFileSystem) {
+ return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) targetFileSystem);
+ } else {
+ LOGGER.error(
+ "Invalid file type. Lease recovery is not supported on filesystem with file: "
+ + filePath);
+ return false;
+ }
+ default:
+ LOGGER.error("Invalid file type. Lease recovery is not supported on filesystem with file: "
+ + filePath);
+ return false;
+ }
+ }
+
+ /**
+ * Recovers lease on a file
+ *
+ * @param filePath
+ * @param path
+ * @param fs
+ * @return
+ * @throws IOException
+ */
+ private static boolean recoverLeaseOnFile(String filePath, Path path, DistributedFileSystem fs)
+ throws IOException {
+ DistributedFileSystem dfs = fs;
+ int maxAttempts = getLeaseRecoveryRetryCount();
+ int retryInterval = getLeaseRecoveryRetryInterval();
+ boolean leaseRecovered = false;
+ IOException ioException = null;
+ for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
+ try {
+ leaseRecovered = dfs.recoverLease(path);
+ if (!leaseRecovered) {
+ try {
+ LOGGER.info(
+ "Failed to recover lease after attempt " + retryCount + " . Will try again after "
+ + retryInterval + " ms...");
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e) {
+ LOGGER.error(e,
+ "Interrupted exception occurred while recovering lease for file : " + filePath);
+ }
+ }
+ } catch (IOException e) {
+ if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
+ LOGGER.error("The given file does not exist at path " + filePath);
+ throw e;
+ } else if (e instanceof FileNotFoundException) {
+ LOGGER.error("The given file does not exist at path " + filePath);
+ throw e;
+ } else {
+ LOGGER.error("Recover lease threw exception : " + e.getMessage());
+ ioException = e;
+ }
+ }
+ LOGGER.info("Retrying again after interval of " + retryInterval + " ms...");
+ }
+ if (leaseRecovered) {
+ LOGGER.info("Successfully able to recover lease on file: " + filePath);
+ return true;
+ } else {
+ LOGGER.error(
+ "Failed to recover lease on file: " + filePath + " after retrying for " + maxAttempts
+ + " at an interval of " + retryInterval);
+ if (null != ioException) {
+ throw ioException;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private static int getLeaseRecoveryRetryCount() {
+ String retryMaxAttempts = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT,
+ CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+ int retryCount = 0;
+ try {
+ retryCount = Integer.parseInt(retryMaxAttempts);
+ if (retryCount < CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN
+ || retryCount > CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+ LOGGER.warn(
+ "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+ + " is not in allowed range. Allowed range is >="
+ + CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN + " and <="
+ + CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX + ". Therefore considering default value: "
+ + retryCount);
+ }
+ } catch (NumberFormatException ne) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+ LOGGER.warn("value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+ + " is incorrect. Therefore considering default value: " + retryCount);
+ }
+ return retryCount;
+ }
+
+ private static int getLeaseRecoveryRetryInterval() {
+ String retryMaxAttempts = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL,
+ CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+ int retryCount = 0;
+ try {
+ retryCount = Integer.parseInt(retryMaxAttempts);
+ if (retryCount < CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN
+ || retryCount > CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+ LOGGER.warn(
+ "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+ + " is not in allowed range. Allowed range is >="
+ + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + " and <="
+ + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX
+ + ". Therefore considering default value (ms): " + retryCount);
+ }
+ } catch (NumberFormatException ne) {
+ retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+ LOGGER.warn(
+ "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+ + " is incorrect. Therefore considering default value (ms): " + retryCount);
+ }
+ return retryCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
index 9de41e1..64ff202 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.service.CarbonCommonFactory;
import org.apache.carbondata.core.service.PathService;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.HDFSLeaseUtils;
import org.apache.carbondata.format.ColumnDictionaryChunk;
import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
@@ -359,7 +360,24 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
// create thrift writer instance
dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
// open the file stream
- dictionaryThriftWriter.open();
+ try {
+ dictionaryThriftWriter.open();
+ } catch (IOException e) {
+ // Cases to handle
+ // 1. Handle File lease recovery
+ if (HDFSLeaseUtils.checkExceptionMessageForLeaseRecovery(e.getMessage())) {
+ LOGGER.error(e, "Lease recovery exception encountered for file: " + dictionaryFile);
+ boolean leaseRecovered = HDFSLeaseUtils.recoverFileLease(dictionaryFile);
+ if (leaseRecovered) {
+ // try to open output stream again after recovering the lease on file
+ dictionaryThriftWriter.open();
+ } else {
+ throw e;
+ }
+ } else {
+ throw e;
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
index 9bf549d..d7b1a0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -136,7 +136,7 @@ public class ThriftWriter {
*/
public void close() throws IOException {
closeAtomicFileWriter();
- CarbonUtil.closeStreams(dataOutputStream);
+ CarbonUtil.closeStream(dataOutputStream);
}
/**
[3/5] carbondata git commit: [CARBONDATA-1271] Enhanced Performance
for Hive Integration with Carbondata
Posted by ja...@apache.org.
[CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata
This closes #1142
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cbe14197
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cbe14197
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cbe14197
Branch: refs/heads/encoding_override
Commit: cbe141976a53a558b84d6e31baf3ec54a9bc38cc
Parents: 285ce72
Author: Bhavya <bh...@knoldus.com>
Authored: Thu Jul 6 11:53:03 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 12 17:40:11 2017 +0800
----------------------------------------------------------------------
.../core/stats/QueryStatisticsRecorderImpl.java | 80 +++---
.../carbondata/hadoop/CarbonInputFormat.java | 7 +-
.../carbondata/hive/CarbonArrayInspector.java | 4 -
.../hive/CarbonDictionaryDecodeReadSupport.java | 288 +++++++++++++++++++
.../carbondata/hive/CarbonHiveInputSplit.java | 23 +-
.../carbondata/hive/CarbonHiveRecordReader.java | 67 ++---
.../apache/carbondata/hive/CarbonHiveSerDe.java | 36 +--
.../hive/MapredCarbonInputFormat.java | 129 ++++++---
.../hive/server/HiveEmbeddedServer2.java | 1 +
9 files changed, 477 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
index f84a674..ffb7d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
@@ -101,45 +101,47 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser
long scannedPages = 0;
try {
for (QueryStatistic statistic : queryStatistics) {
- switch (statistic.getMessage()) {
- case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
- load_blocks_time += statistic.getTimeTaken();
- break;
- case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
- scan_blocks_time += statistic.getCount();
- break;
- case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
- scan_blocks_num += statistic.getCount();
- break;
- case QueryStatisticsConstants.LOAD_DICTIONARY:
- load_dictionary_time += statistic.getTimeTaken();
- break;
- case QueryStatisticsConstants.RESULT_SIZE:
- result_size += statistic.getCount();
- break;
- case QueryStatisticsConstants.EXECUTOR_PART:
- total_executor_time += statistic.getTimeTaken();
- break;
- case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
- total_blocklet = statistic.getCount();
- break;
- case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
- valid_scan_blocklet = statistic.getCount();
- break;
- case QueryStatisticsConstants.VALID_PAGE_SCANNED:
- valid_pages_blocklet = statistic.getCount();
- break;
- case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
- total_pages = statistic.getCount();
- break;
- case QueryStatisticsConstants.READ_BLOCKlET_TIME:
- readTime = statistic.getCount();
- break;
- case QueryStatisticsConstants.PAGE_SCANNED:
- scannedPages = statistic.getCount();
- break;
- default:
- break;
+ if (statistic.getMessage() != null) {
+ switch (statistic.getMessage()) {
+ case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
+ load_blocks_time += statistic.getTimeTaken();
+ break;
+ case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
+ scan_blocks_time += statistic.getCount();
+ break;
+ case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
+ scan_blocks_num += statistic.getCount();
+ break;
+ case QueryStatisticsConstants.LOAD_DICTIONARY:
+ load_dictionary_time += statistic.getTimeTaken();
+ break;
+ case QueryStatisticsConstants.RESULT_SIZE:
+ result_size += statistic.getCount();
+ break;
+ case QueryStatisticsConstants.EXECUTOR_PART:
+ total_executor_time += statistic.getTimeTaken();
+ break;
+ case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
+ total_blocklet = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
+ valid_scan_blocklet = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.VALID_PAGE_SCANNED:
+ valid_pages_blocklet = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
+ total_pages = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.READ_BLOCKlET_TIME:
+ readTime = statistic.getCount();
+ break;
+ case QueryStatisticsConstants.PAGE_SCANNED:
+ scannedPages = statistic.getCount();
+ break;
+ default:
+ break;
+ }
}
}
String headers =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e69648..16b5d69 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -444,9 +444,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
}
}
}
+
+ // For Hive integration if we have to get the stats we have to fetch hive.query.id
+ String query_id = job.getConfiguration().get("query.id") != null ?
+ job.getConfiguration().get("query.id") :
+ job.getConfiguration().get("hive.query.id");
statistic
.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
- recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ recorder.recordStatisticsForDriver(statistic, query_id);
return resultFilterredBlocks;
} finally {
// clean up the access count for a segment as soon as its usage is complete so that in
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
index 49e068a..b26c959 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.hive;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -122,9 +121,6 @@ class CarbonArrayInspector implements SettableListObjectInspector {
final Writable[] array = ((ArrayWritable) subObj).get();
final List<Writable> list = Arrays.asList(array);
-
- Collections.addAll(list, array);
-
return list;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
new file mode 100644
index 0000000..bc66d49
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+
+/**
+ * This is the class to decode dictionary encoded column data back to its original value.
+ */
+public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
+
+ protected Dictionary[] dictionaries;
+
+ protected DataType[] dataTypes;
+ /**
+ * carbon columns
+ */
+ protected CarbonColumn[] carbonColumns;
+
+ protected Writable[] writableArr;
+
+ /**
+ * This initialization is done inside executor task
+ * for column dictionary involved in decoding.
+ *
+ * @param carbonColumns column list
+ * @param absoluteTableIdentifier table identifier
+ */
+ @Override public void initialize(CarbonColumn[] carbonColumns,
+ AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+ this.carbonColumns = carbonColumns;
+ dictionaries = new Dictionary[carbonColumns.length];
+ dataTypes = new DataType[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
+ .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
+ .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ dataTypes[i] = carbonColumns[i].getDataType();
+ dictionaries[i] = forwardDictionaryCache.get(
+ new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+ carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
+ } else {
+ dataTypes[i] = carbonColumns[i].getDataType();
+ }
+ }
+ }
+
+ @Override public T readRow(Object[] data) {
+ assert (data.length == dictionaries.length);
+ writableArr = new Writable[data.length];
+ for (int i = 0; i < dictionaries.length; i++) {
+ if (dictionaries[i] != null) {
+ data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+ }
+ try {
+ writableArr[i] = createWritableObject(data[i], carbonColumns[i]);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ return (T) writableArr;
+ }
+
+ /**
+ * to book keep the dictionary cache or update access count for each
+ * column involved during decode, to facilitate LRU cache policy if memory
+ * threshold is reached
+ */
+ @Override public void close() {
+ if (dictionaries == null) {
+ return;
+ }
+ for (int i = 0; i < dictionaries.length; i++) {
+ CarbonUtil.clearDictionaryCache(dictionaries[i]);
+ }
+ }
+
+ /**
+ * To Create the Writable from the CarbonData data
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private Writable createWritableObject(Object obj, CarbonColumn carbonColumn) throws IOException {
+ DataType dataType = carbonColumn.getDataType();
+ switch (dataType) {
+ case STRUCT:
+ return createStruct(obj, carbonColumn);
+ case ARRAY:
+ return createArray(obj, carbonColumn);
+ default:
+ return createWritablePrimitive(obj, carbonColumn);
+ }
+ }
+
+ /**
+ * Create Array Data for Array Datatype
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private ArrayWritable createArray(Object obj, CarbonColumn carbonColumn) throws IOException {
+ if (obj instanceof GenericArrayData) {
+ Object[] objArray = ((GenericArrayData) obj).array();
+ List<CarbonDimension> childCarbonDimensions = null;
+ CarbonDimension arrayDimension = null;
+ if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+ childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ arrayDimension = childCarbonDimensions.get(0);
+ }
+ List array = new ArrayList();
+ if (objArray != null) {
+ for (int i = 0; i < objArray.length; i++) {
+ Object curObj = objArray[i];
+ Writable newObj = createWritableObject(curObj, arrayDimension);
+ array.add(newObj);
+ }
+ }
+ if (array.size() > 0) {
+ ArrayWritable subArray = new ArrayWritable(Writable.class,
+ (Writable[]) array.toArray(new Writable[array.size()]));
+ return new ArrayWritable(Writable.class, new Writable[] { subArray });
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Create the Struct data for the Struct Datatype
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private ArrayWritable createStruct(Object obj, CarbonColumn carbonColumn) throws IOException {
+ if (obj instanceof GenericInternalRow) {
+ Object[] objArray = ((GenericInternalRow) obj).values();
+ List<CarbonDimension> childCarbonDimensions = null;
+ if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+ childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+ }
+ Writable[] arr = new Writable[objArray.length];
+ for (int i = 0; i < objArray.length; i++) {
+
+ arr[i] = createWritableObject(objArray[i], childCarbonDimensions.get(i));
+ }
+ return new ArrayWritable(Writable.class, arr);
+ }
+ throw new IOException("DataType not supported in Carbondata");
+ }
+
+ /**
+ * This method will create the Writable Objects for primitives.
+ *
+ * @param obj
+ * @param carbonColumn
+ * @return
+ * @throws IOException
+ */
+ private Writable createWritablePrimitive(Object obj, CarbonColumn carbonColumn)
+ throws IOException {
+ DataType dataType = carbonColumn.getDataType();
+ if (obj == null) {
+ return null;
+ }
+ switch (dataType) {
+ case NULL:
+ return null;
+ case DOUBLE:
+ return new DoubleWritable((double) obj);
+ case INT:
+ return new IntWritable((int) obj);
+ case LONG:
+ return new LongWritable((long) obj);
+ case SHORT:
+ return new ShortWritable((Short) obj);
+ case DATE:
+ return new DateWritable(new Date((Integer) obj));
+ case TIMESTAMP:
+ return new TimestampWritable(new Timestamp((long) obj));
+ case STRING:
+ return new Text(obj.toString());
+ case DECIMAL:
+ return new HiveDecimalWritable(
+ HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+ }
+ throw new IOException("Unknown primitive : " + dataType.getName());
+ }
+
+ /**
+ * If we need to use the same Writable[] then we can use this method
+ *
+ * @param writable
+ * @param obj
+ * @param carbonColumn
+ * @throws IOException
+ */
+ private void setPrimitive(Writable writable, Object obj, CarbonColumn carbonColumn)
+ throws IOException {
+ DataType dataType = carbonColumn.getDataType();
+ if (obj == null) {
+ writable.write(null);
+ }
+ switch (dataType) {
+ case DOUBLE:
+ ((DoubleWritable) writable).set((double) obj);
+ break;
+ case INT:
+ ((IntWritable) writable).set((int) obj);
+ break;
+ case LONG:
+ ((LongWritable) writable).set((long) obj);
+ break;
+ case SHORT:
+ ((ShortWritable) writable).set((short) obj);
+ break;
+ case DATE:
+ ((DateWritable) writable).set(new Date((Long) obj));
+ break;
+ case TIMESTAMP:
+ ((TimestampWritable) writable).set(new Timestamp((long) obj));
+ break;
+ case STRING:
+ ((Text) writable).set(obj.toString());
+ break;
+ case DECIMAL:
+ ((HiveDecimalWritable) writable)
+ .set(HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
index bfe4d27..b922295 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -113,8 +113,7 @@ public class CarbonHiveInputSplit extends FileSplit
}
public static CarbonHiveInputSplit from(String segmentId, FileSplit split,
- ColumnarFormatVersion version)
- throws IOException {
+ ColumnarFormatVersion version) throws IOException {
return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
split.getLocations(), version);
}
@@ -151,8 +150,7 @@ public class CarbonHiveInputSplit extends FileSplit
return segmentId;
}
- @Override
- public void readFields(DataInput in) throws IOException {
+ @Override public void readFields(DataInput in) throws IOException {
super.readFields(in);
this.segmentId = in.readUTF();
this.version = ColumnarFormatVersion.valueOf(in.readShort());
@@ -162,10 +160,10 @@ public class CarbonHiveInputSplit extends FileSplit
for (int i = 0; i < numInvalidSegment; i++) {
invalidSegments.add(in.readUTF());
}
+ this.numberOfBlocklets = in.readInt();
}
- @Override
- public void write(DataOutput out) throws IOException {
+ @Override public void write(DataOutput out) throws IOException {
super.write(out);
out.writeUTF(segmentId);
out.writeShort(version.number());
@@ -174,6 +172,7 @@ public class CarbonHiveInputSplit extends FileSplit
for (String invalidSegment : invalidSegments) {
out.writeUTF(invalidSegment);
}
+ out.writeInt(numberOfBlocklets);
}
public List<String> getInvalidSegments() {
@@ -213,8 +212,7 @@ public class CarbonHiveInputSplit extends FileSplit
return bucketId;
}
- @Override
- public int compareTo(Distributable o) {
+ @Override public int compareTo(Distributable o) {
if (o == null) {
return -1;
}
@@ -264,18 +262,15 @@ public class CarbonHiveInputSplit extends FileSplit
return 0;
}
- @Override
- public String getBlockPath() {
+ @Override public String getBlockPath() {
return getPath().getName();
}
- @Override
- public List<Long> getMatchedBlocklets() {
+ @Override public List<Long> getMatchedBlocklets() {
return null;
}
- @Override
- public boolean fullScan() {
+ @Override public boolean fullScan() {
return true;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index e4df02e..2a92185 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -62,6 +62,8 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
private ArrayWritable valueObj = null;
private CarbonObjectInspector objInspector;
+ private long recordReaderCounter = 0;
+ private int[] columnIds;
public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
InputSplit inputSplit, JobConf jobConf) throws IOException {
@@ -88,17 +90,12 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
} catch (QueryExecutionException e) {
throw new IOException(e.getMessage(), e.getCause());
}
- if (valueObj == null) {
- valueObj =
- new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
- }
-
final TypeInfo rowTypeInfo;
final List<String> columnNames;
List<TypeInfo> columnTypes;
// Get column names and sort order
final String colIds = conf.get("hive.io.file.readcolumn.ids");
- final String columnNameProperty = conf.get("hive.io.file.readcolumn.names");
+ final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
if (columnNameProperty.length() == 0) {
@@ -111,47 +108,39 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
+
+ if (valueObj == null) {
+ valueObj = new ArrayWritable(Writable.class, new Writable[columnTypes.size()]);
+ }
+
if (!colIds.equals("")) {
String[] arraySelectedColId = colIds.split(",");
List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
-
- for (String anArrayColId : arraySelectedColId) {
- reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
+ columnIds = new int[arraySelectedColId.length];
+ int columnId = 0;
+ for (int j = 0; j < arraySelectedColId.length; j++) {
+ columnId = Integer.parseInt(arraySelectedColId[j]);
+ columnIds[j] = columnId;
}
- // Create row related objects
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
- } else {
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}
+
+ rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}
@Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
if (carbonIterator.hasNext()) {
Object obj = readSupport.readRow(carbonIterator.next());
- ArrayWritable tmpValue;
- try {
- tmpValue = createArrayWritable(obj);
- } catch (SerDeException se) {
- throw new IOException(se.getMessage(), se.getCause());
- }
-
- if (value != tmpValue) {
- final Writable[] arrValue = value.get();
- final Writable[] arrCurrent = tmpValue.get();
- if (valueObj != null && arrValue.length == arrCurrent.length) {
- System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
- } else {
- if (arrValue.length != arrCurrent.length) {
- throw new IOException(
- "CarbonHiveInput : size of object differs. Value" + " size : " + arrValue.length
- + ", Current Object size : " + arrCurrent.length);
- } else {
- throw new IOException("CarbonHiveInput can not support RecordReaders that"
- + " don't return same key & value & value is null");
- }
+ recordReaderCounter++;
+ Writable[] objArray = (Writable[]) obj;
+ Writable[] sysArray = new Writable[value.get().length];
+ if (columnIds != null && columnIds.length > 0 && objArray.length == columnIds.length) {
+ for (int i = 0; i < columnIds.length; i++) {
+ sysArray[columnIds[i]] = objArray[i];
}
+ value.set(sysArray);
+ } else {
+ value.set(objArray);
}
return true;
} else {
@@ -159,10 +148,6 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
}
}
- private ArrayWritable createArrayWritable(Object obj) throws SerDeException {
- return createStruct(obj, objInspector);
- }
-
@Override public Void createKey() {
return null;
}
@@ -172,7 +157,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
}
@Override public long getPos() throws IOException {
- return 0;
+ return recordReaderCounter;
}
@Override public float getProgress() throws IOException {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index f66f3ed..2980ad3 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -79,11 +79,9 @@ class CarbonHiveSerDe extends AbstractSerDe {
final TypeInfo rowTypeInfo;
final List<String> columnNames;
- final List<String> reqColNames;
final List<TypeInfo> columnTypes;
// Get column names and sort order
assert configuration != null;
- final String colIds = configuration.get("hive.io.file.readcolumn.ids");
final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -98,29 +96,17 @@ class CarbonHiveSerDe extends AbstractSerDe {
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
- if (colIds != null && !colIds.equals("")) {
- reqColNames = new ArrayList<String>();
-
- String[] arraySelectedColId = colIds.split(",");
- List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
- for (String anArrayColId : arraySelectedColId) {
- reqColNames.add(columnNames.get(Integer.parseInt(anArrayColId)));
- reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
- }
- // Create row related objects
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(reqColNames, reqColTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
- }
- else {
- // Create row related objects
- rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
- this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-
- // Stats part
- serializedSize = 0;
- deserializedSize = 0;
- status = LAST_OPERATION.UNKNOWN;
- }
+
+
+
+ // Create row related objects
+ rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
+
+ // Stats part
+ serializedSize = 0;
+ deserializedSize = 0;
+ status = LAST_OPERATION.UNKNOWN;
}
@Override public Class<? extends Writable> getSerializedClass() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 7a1c9db..58f25c9 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -17,12 +17,12 @@
package org.apache.carbondata.hive;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
@@ -31,8 +31,11 @@ import org.apache.carbondata.hadoop.CarbonInputFormat;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.io.ArrayWritable;
@@ -42,9 +45,11 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {
+ private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
@Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
@@ -63,47 +68,64 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
@Override
public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
- QueryModel queryModel = getQueryModel(jobConf);
- CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf);
+ String path = null;
+ if (inputSplit instanceof CarbonHiveInputSplit) {
+ path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
+ }
+ QueryModel queryModel = getQueryModel(jobConf, path);
+ CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>();
return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
}
- private QueryModel getQueryModel(Configuration configuration) throws IOException {
- CarbonTable carbonTable = getCarbonTable(configuration);
+ /**
+ * this method will read the schema from the physical file and populate into CARBON_TABLE
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ private static void populateCarbonTable(Configuration configuration, String paths)
+ throws IOException {
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ String validInputPath = null;
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
+ } else {
+ if (paths != null) {
+ for (String inputPath : inputPaths) {
+ if (paths.startsWith(inputPath)) {
+ validInputPath = inputPath;
+ break;
+ }
+ }
+ }
+ }
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.fromTablePath(validInputPath);
+ // read the schema file to get the absoluteTableIdentifier having the correct table id
+ // persisted in the schema
+ CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+ setCarbonTable(configuration, carbonTable);
+ }
+
+ private static CarbonTable getCarbonTable(Configuration configuration, String path)
+ throws IOException {
+ populateCarbonTable(configuration, path);
+ // read it from schema file in the store
+ String carbonTableStr = configuration.get(CARBON_TABLE);
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ }
+
+ private QueryModel getQueryModel(Configuration configuration, String path) throws IOException {
+ CarbonTable carbonTable = getCarbonTable(configuration, path);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization
StringBuilder colNames = new StringBuilder();
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
- // query plan includes projection column
- String projection = getColumnProjection(configuration);
- if (projection == null) {
- projection = configuration.get("hive.io.file.readcolumn.names");
- }
- if (projection.equals("")) {
- List<CarbonDimension> carbonDimensionList = carbonTable.getAllDimensions();
- List<CarbonMeasure> carbonMeasureList = carbonTable.getAllMeasures();
-
- for (CarbonDimension aCarbonDimensionList : carbonDimensionList) {
- colNames = new StringBuilder((colNames + (aCarbonDimensionList.getColName())) + ",");
- }
- if (carbonMeasureList.size() < 1) {
- colNames = new StringBuilder(colNames.substring(0, colNames.lastIndexOf(",")));
- }
- for (int index = 0; index < carbonMeasureList.size(); index++) {
- if (!carbonMeasureList.get(index).getColName().equals("default_dummy_measure")) {
- if (index == carbonMeasureList.size() - 1) {
- colNames.append(carbonMeasureList.get(index).getColName());
- } else {
- colNames =
- new StringBuilder((colNames + (carbonMeasureList.get(index).getColName())) + ",");
- }
- }
- }
- projection = colNames.toString().trim();
- configuration.set("hive.io.file.readcolumn.names", colNames.toString());
- }
+ String projection = getProjection(configuration, carbonTable,
+ identifier.getCarbonTableIdentifier().getTableName());
CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
// set the filter to the query model in order to filter blocklet before scan
@@ -115,6 +137,45 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
return queryModel;
}
+ /**
+ * Return the Projection for the CarbonQuery.
+ *
+ * @param configuration
+ * @param carbonTable
+ * @param tableName
+ * @return
+ */
+ private String getProjection(Configuration configuration, CarbonTable carbonTable,
+ String tableName) {
+ // query plan includes projection column
+ String projection = getColumnProjection(configuration);
+ if (projection == null) {
+ projection = configuration.get("hive.io.file.readcolumn.names");
+ }
+ List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(tableName);
+ List<String> carbonColumnNames = new ArrayList<>();
+ StringBuilder allColumns = new StringBuilder();
+ StringBuilder projectionColumns = new StringBuilder();
+ for (CarbonColumn column : carbonColumns) {
+ carbonColumnNames.add(column.getColName());
+ allColumns.append(column.getColName() + ",");
+ }
+
+ if (!projection.equals("")) {
+ String[] columnNames = projection.split(",");
+ //verify that the columns parsed by Hive exist in the table
+ for (String col : columnNames) {
+ //show columns command will return these data
+ if (carbonColumnNames.contains(col)) {
+ projectionColumns.append(col + ",");
+ }
+ }
+ return projectionColumns.substring(0, projectionColumns.lastIndexOf(","));
+ } else {
+ return allColumns.substring(0, allColumns.lastIndexOf(","));
+ }
+ }
+
@Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return true;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
index d8705f8..ae931fb 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
@@ -130,6 +130,7 @@ public class HiveEmbeddedServer2 {
conf.set("hive.added.files.path", "");
conf.set("hive.added.archives.path", "");
conf.set("fs.default.name", "file:///");
+ conf.set(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD.varname, "false");
// clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects
// this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)
[4/5] carbondata git commit: [CARBONDATA-1283] Carbon should continue
with default value for wrong value in configured property
Posted by ja...@apache.org.
[CARBONDATA-1283] Carbon should continue with default value for wrong value in configured property
This closes #1155
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1a35cfb9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1a35cfb9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1a35cfb9
Branch: refs/heads/encoding_override
Commit: 1a35cfb90d0f4a4da05ec80f7a5c192f6832b36d
Parents: cbe1419
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Mon Jul 10 17:47:16 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Jul 12 19:28:44 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 14 ++
.../core/datastore/impl/FileFactory.java | 7 +-
.../carbondata/core/locks/HdfsFileLock.java | 5 +-
.../carbondata/core/util/CarbonProperties.java | 130 ++++++++++++++-
.../apache/carbondata/core/util/CarbonUtil.java | 13 +-
.../core/CarbonPropertiesValidationTest.java | 164 +++++++++++++++++++
6 files changed, 314 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8110abb..ccb6344 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -143,6 +143,11 @@ public final class CarbonCommonConstants {
* VIEWFSURL_PREFIX
*/
public static final String VIEWFSURL_PREFIX = "viewfs://";
+
+ /**
+ * ALLUXIO_PREFIX
+ */
+ public static final String ALLUXIOURL_PREFIX = "alluxio://";
/**
* FS_DEFAULT_FS
*/
@@ -329,6 +334,15 @@ public final class CarbonCommonConstants {
*/
public static final String CSV_READ_BUFFER_SIZE_DEFAULT = "50000";
/**
+ * min value for csv read buffer size
+ */
+ public static final int CSV_READ_BUFFER_SIZE_MIN = 10240; //10 kb
+ /**
+ * max value for csv read buffer size
+ */
+ public static final int CSV_READ_BUFFER_SIZE_MAX = 10485760; // 10 mb
+
+ /**
* CSV_READ_COPIES
*/
public static final String DEFAULT_NUMBER_CORES = "2";
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 2a35ab3..2794470 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.filesystem.*;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -74,13 +75,13 @@ public final class FileFactory {
}
public static FileType getFileType(String path) {
- if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
+ if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
return FileType.HDFS;
}
- else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
+ else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
return FileType.ALLUXIO;
}
- else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+ else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
return FileType.VIEWFS;
}
return FileType.LOCAL;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index 94e7307..326f8ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -54,8 +54,9 @@ public class HdfsFileLock extends AbstractCarbonLock {
// If can not get the STORE_LOCATION, then use hadoop.tmp.dir .
tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION,
System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
- if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
- && !tmpPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+ if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) && !tmpPath
+ .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) && !tmpPath
+ .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
tmpPath = hdfsPath + tmpPath;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c1e70ff..c9dd1ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -33,6 +33,8 @@ import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.hadoop.conf.Configuration;
+
public final class CarbonProperties {
/**
* Attribute for Carbon LOGGER.
@@ -98,6 +100,124 @@ public final class CarbonProperties {
validateBlockletGroupSizeInMB();
validateNumberOfColumnPerIORead();
validateNumberOfRowsPerBlockletColumnPage();
+ validateEnableUnsafeSort();
+ validateCustomBlockDistribution();
+ validateEnableVectorReader();
+ validateLockType();
+ validateCarbonCSVReadBufferSizeByte();
+ }
+
+ private void validateCarbonCSVReadBufferSizeByte() {
+ String csvReadBufferSizeStr =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ if (null != csvReadBufferSizeStr) {
+ try {
+ int bufferSize = Integer.parseInt(csvReadBufferSizeStr);
+ if (bufferSize < CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MIN
+ || bufferSize > CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MAX) {
+ LOGGER.warn("The value \"" + csvReadBufferSizeStr + "\" configured for key "
+ + CarbonCommonConstants.CSV_READ_BUFFER_SIZE
+ + "\" is not in range. Valid range is (byte) \""
+ + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MIN + " to \""
+ + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MAX + ". Using the default value \""
+ + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+ carbonProperties.setProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+ }
+ } catch (NumberFormatException nfe) {
+ LOGGER.warn("The value \"" + csvReadBufferSizeStr + "\" configured for key "
+ + CarbonCommonConstants.CSV_READ_BUFFER_SIZE
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+ carbonProperties.setProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+ }
+ }
+ }
+
+ private void validateLockType() {
+ String lockTypeConfigured = carbonProperties.getProperty(CarbonCommonConstants.LOCK_TYPE);
+ if (null != lockTypeConfigured) {
+ switch (lockTypeConfigured.toUpperCase()) {
+ // if user is setting the lock type as CARBON_LOCK_TYPE_ZOOKEEPER then no need to validate
+ // else validate based on the file system type for LOCAL file system lock will be
+ // CARBON_LOCK_TYPE_LOCAL and for the distributed one CARBON_LOCK_TYPE_HDFS
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+ break;
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+ case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+ default:
+ validateAndConfigureLockType(lockTypeConfigured);
+ }
+ } else {
+ validateAndConfigureLockType(lockTypeConfigured);
+ }
+ }
+
+ /**
+ * the method decide and set the lock type based on the configured system type
+ *
+ * @param lockTypeConfigured
+ */
+ private void validateAndConfigureLockType(String lockTypeConfigured) {
+ Configuration configuration = new Configuration(true);
+ String defaultFs = configuration.get("fs.defaultFS");
+ if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
+ || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
+ .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX))
+ && !CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS.equalsIgnoreCase(lockTypeConfigured)) {
+ LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key "
+ + CarbonCommonConstants.LOCK_TYPE + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS);
+ carbonProperties.setProperty(CarbonCommonConstants.LOCK_TYPE,
+ CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS);
+ } else if (null != defaultFs && defaultFs.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX)
+ && !CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL.equalsIgnoreCase(lockTypeConfigured)) {
+ carbonProperties.setProperty(CarbonCommonConstants.LOCK_TYPE,
+ CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL);
+ LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key "
+ + CarbonCommonConstants.LOCK_TYPE
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL);
+ }
+ }
+
+ private void validateEnableVectorReader() {
+ String vectorReaderStr =
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER);
+ boolean isValidBooleanValue = CarbonUtil.validateBoolean(vectorReaderStr);
+ if (!isValidBooleanValue) {
+ LOGGER.warn("The enable vector reader value \"" + vectorReaderStr
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT);
+ carbonProperties.setProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT);
+ }
+ }
+
+ private void validateCustomBlockDistribution() {
+ String customBlockDistributionStr =
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
+ boolean isValidBooleanValue = CarbonUtil.validateBoolean(customBlockDistributionStr);
+ if (!isValidBooleanValue) {
+ LOGGER.warn("The custom block distribution value \"" + customBlockDistributionStr
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT);
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT);
+ }
+ }
+
+ private void validateEnableUnsafeSort() {
+ String unSafeSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
+ boolean isValidBooleanValue = CarbonUtil.validateBoolean(unSafeSortStr);
+ if (!isValidBooleanValue) {
+ LOGGER.warn("The enable unsafe sort value \"" + unSafeSortStr
+ + "\" is invalid. Using the default value \""
+ + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT);
+ carbonProperties.setProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT);
+ }
}
private void initPropertySet() throws IllegalAccessException {
@@ -330,12 +450,10 @@ public final class CarbonProperties {
}
private void validateHighCardinalityIdentify() {
- String highcardIdentifyStr = carbonProperties
- .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
- CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
- try {
- Boolean.parseBoolean(highcardIdentifyStr);
- } catch (NumberFormatException e) {
+ String highcardIdentifyStr =
+ carbonProperties.getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE);
+ boolean validateBoolean = CarbonUtil.validateBoolean(highcardIdentifyStr);
+ if (!validateBoolean) {
LOGGER.info("The high cardinality identify value \"" + highcardIdentifyStr
+ "\" is invalid. Using the default value \""
+ CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 06b2a61..1b08263 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -84,11 +84,6 @@ import org.apache.thrift.transport.TIOStreamTransport;
public final class CarbonUtil {
- public static final String HDFS_PREFIX = "hdfs://";
- public static final String VIEWFS_PREFIX = "viewfs://";
- public static final String ALLUXIO_PREFIX = "alluxio://";
- private static final String FS_DEFAULT_FS = "fs.defaultFS";
-
/**
* Attribute for Carbon LOGGER
*/
@@ -697,7 +692,7 @@ public final class CarbonUtil {
*/
public static String checkAndAppendHDFSUrl(String filePath) {
String currentPath = filePath;
- String defaultFsUrl = conf.get(FS_DEFAULT_FS);
+ String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
String baseDFSUrl = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
if (checkIfPrefixExists(filePath)) {
@@ -721,8 +716,10 @@ public final class CarbonUtil {
private static boolean checkIfPrefixExists(String path) {
final String lowerPath = path.toLowerCase();
- return lowerPath.startsWith(HDFS_PREFIX) || lowerPath.startsWith(VIEWFS_PREFIX) || lowerPath
- .startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX);
+ return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || lowerPath
+ .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || lowerPath
+ .startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) || lowerPath
+ .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX);
}
public static String getCarbonStorePath() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
new file mode 100644
index 0000000..e0262dc
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonProperty;
+
+import junit.framework.TestCase;
+import org.junit.Test;
+
+/**
+ * Method to test the carbon common constant configurations.
+ */
+public class CarbonPropertiesValidationTest extends TestCase {
+
+ CarbonProperties carbonProperties;
+
+ @Override public void setUp() throws Exception {
+ carbonProperties = CarbonProperties.getInstance();
+ }
+
+ @Test public void testvalidateLockType()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType = carbonProperties.getClass().getDeclaredMethod("validateLockType");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.LOCK_TYPE, "xyz");
+ String valueBeforeValidation = carbonProperties.getProperty(CarbonCommonConstants.LOCK_TYPE);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation = carbonProperties.getProperty(CarbonCommonConstants.LOCK_TYPE);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL.equalsIgnoreCase(valueAfterValidation));
+ }
+
+ @Test public void testValidateEnableUnsafeSort()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType =
+ carbonProperties.getClass().getDeclaredMethod("validateEnableUnsafeSort");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "xyz");
+ String valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+ }
+
+ @Test public void testValidateCustomBlockDistribution()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType =
+ carbonProperties.getClass().getDeclaredMethod("validateCustomBlockDistribution");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "xyz");
+ String valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT
+ .equalsIgnoreCase(valueAfterValidation));
+ }
+
+ @Test public void testValidateEnableVectorReader()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType =
+ carbonProperties.getClass().getDeclaredMethod("validateEnableVectorReader");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "xyz");
+ String valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+ }
+
+ @Test public void testValidateCarbonCSVReadBufferSizeByte()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType =
+ carbonProperties.getClass().getDeclaredMethod("validateCarbonCSVReadBufferSizeByte");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "xyz");
+ String valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+ }
+
+ @Test public void testValidateCarbonCSVReadBufferSizeByteRange()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType =
+ carbonProperties.getClass().getDeclaredMethod("validateCarbonCSVReadBufferSizeByte");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "10485761");
+ String valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+ carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "10240");
+ valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ validateMethodType.invoke(carbonProperties);
+ valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ assertTrue(valueBeforeValidation.equals(valueAfterValidation));
+ carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "10239");
+ valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ validateMethodType.invoke(carbonProperties);
+ valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(
+ CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+ }
+
+ @Test public void testValidateHighCardinalityIdentify()
+ throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Method validateMethodType =
+ carbonProperties.getClass().getDeclaredMethod("validateHighCardinalityIdentify");
+ validateMethodType.setAccessible(true);
+ carbonProperties.addProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE, "xyz");
+ String valueBeforeValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE);
+ validateMethodType.invoke(carbonProperties);
+ String valueAfterValidation =
+ carbonProperties.getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE);
+ assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+ assertTrue(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT
+ .equalsIgnoreCase(valueAfterValidation));
+ }
+}
[5/5] carbondata git commit: Update
supported-data-types-in-carbondata.md
Posted by ja...@apache.org.
Update supported-data-types-in-carbondata.md
This closes #1165
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e4da2a6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e4da2a6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e4da2a6
Branch: refs/heads/encoding_override
Commit: 9e4da2a6caeef6acd637e49a29c70d2eedd4a504
Parents: 1a35cfb
Author: chenerlu <ch...@huawei.com>
Authored: Wed Jul 12 18:50:02 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 12 22:04:50 2017 +0800
----------------------------------------------------------------------
docs/supported-data-types-in-carbondata.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e4da2a6/docs/supported-data-types-in-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/supported-data-types-in-carbondata.md b/docs/supported-data-types-in-carbondata.md
index 8f271e3..561248c 100644
--- a/docs/supported-data-types-in-carbondata.md
+++ b/docs/supported-data-types-in-carbondata.md
@@ -35,6 +35,7 @@
* String Types
* STRING
* CHAR
+ * VARCHAR
* Complex Types
* arrays: ARRAY``<data_type>``