You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:06:27 UTC

[1/5] carbondata git commit: [CARBONDATA-1289] remove unused method

Repository: carbondata
Updated Branches:
  refs/heads/encoding_override 403c3d9b4 -> 9e4da2a6c


[CARBONDATA-1289] remove unused method

This closes #1157


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b31f09b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b31f09b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b31f09b

Branch: refs/heads/encoding_override
Commit: 8b31f09b638cb5b8fbbdfa56f29f8c97e68e6aa6
Parents: 403c3d9
Author: czg516516 <cz...@163.com>
Authored: Tue Jul 11 11:01:47 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Jul 11 12:56:18 2017 +0800

----------------------------------------------------------------------
 .../newflow/exception/CarbonDataLoadingException.java  | 13 -------------
 1 file changed, 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b31f09b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
index b9593e7..6ffdd03 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.processing.newflow.exception;
 
-import java.util.Locale;
-
 public class CarbonDataLoadingException extends RuntimeException {
   /**
    * default serial version ID.
@@ -60,17 +58,6 @@ public class CarbonDataLoadingException extends RuntimeException {
   }
 
   /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
    * getLocalizedMessage
    */
   @Override public String getLocalizedMessage() {


[2/5] carbondata git commit: [CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry

Posted by ja...@apache.org.
[CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry

This closes #1147


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/285ce72d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/285ce72d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/285ce72d

Branch: refs/heads/encoding_override
Commit: 285ce72d4c9b3364bbdc454f4b6b331b3caa42db
Parents: 8b31f09
Author: manishgupta88 <to...@gmail.com>
Authored: Sat Jul 8 15:46:25 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jul 11 18:00:18 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 +
 .../core/datastore/impl/FileFactory.java        |  21 ++
 .../AtomicFileOperationsImpl.java               |   5 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../core/util/path/HDFSLeaseUtils.java          | 215 +++++++++++++++++++
 .../core/writer/CarbonDictionaryWriterImpl.java |  20 +-
 .../carbondata/core/writer/ThriftWriter.java    |   2 +-
 7 files changed, 266 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 208bab8..8110abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1287,6 +1287,13 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
 
+  @CarbonProperty
+  public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT =
+      "carbon.lease.recovery.retry.count";
+  @CarbonProperty
+  public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL =
+      "carbon.lease.recovery.retry.interval";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 7acd6b1..2a35ab3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -518,4 +518,25 @@ public final class FileFactory {
     }
   }
 
+  /**
+   * This method will create the path object for a given file
+   *
+   * @param filePath
+   * @return
+   */
+  public static Path getPath(String filePath) {
+    return new Path(filePath);
+  }
+
+  /**
+   * This method will return the filesystem instance
+   *
+   * @param path
+   * @return
+   * @throws IOException
+   */
+  public static FileSystem getFileSystem(Path path) throws IOException {
+    return path.getFileSystem(configuration);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
index befc76e..61690ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 public class AtomicFileOperationsImpl implements AtomicFileOperations {
 
@@ -67,10 +68,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
   @Override public void close() throws IOException {
 
     if (null != dataOutStream) {
-      dataOutStream.close();
-
+      CarbonUtil.closeStream(dataOutStream);
       CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-
       if (!tempFile.renameForce(filePath)) {
         throw new IOException("temporary file renaming failed, src="
             + tempFile.getPath() + ", dest=" + filePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8298600..06b2a61 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -128,7 +128,7 @@ public final class CarbonUtil {
         try {
           closeStream(stream);
         } catch (IOException e) {
-          LOGGER.error("Error while closing stream:" + e);
+          LOGGER.error(e, "Error while closing stream:" + e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
new file mode 100644
index 0000000..c72c322
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+
+/**
+ * Implementation for HDFS utility methods
+ */
+public class HDFSLeaseUtils {
+
+  private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN = 1;
+  private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX = 50;
+  private static final String CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT = "5";
+  private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN = 1000;
+  private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX = 10000;
+  private static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT = "1000";
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(HDFSLeaseUtils.class.getName());
+
+  /**
+   * This method will validate whether the exception thrown if for lease recovery from HDFS
+   *
+   * @param message
+   * @return
+   */
+  public static boolean checkExceptionMessageForLeaseRecovery(String message) {
+    // depending on the scenario few more cases can be added for validating lease recovery exception
+    if (null != message && message.contains("Failed to APPEND_FILE")) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This method will make attempts to recover lease on a file using the
+   * distributed file system utility.
+   *
+   * @param filePath
+   * @return
+   * @throws IOException
+   */
+  public static boolean recoverFileLease(String filePath) throws IOException {
+    LOGGER.info("Trying to recover lease on file: " + filePath);
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    switch (fileType) {
+      case ALLUXIO:
+      case HDFS:
+        Path path = FileFactory.getPath(filePath);
+        FileSystem fs = FileFactory.getFileSystem(path);
+        return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs);
+      case VIEWFS:
+        path = FileFactory.getPath(filePath);
+        fs = FileFactory.getFileSystem(path);
+        ViewFileSystem viewFileSystem = (ViewFileSystem) fs;
+        Path targetFileSystemPath = viewFileSystem.resolvePath(path);
+        FileSystem targetFileSystem = FileFactory.getFileSystem(targetFileSystemPath);
+        if (targetFileSystem instanceof DistributedFileSystem) {
+          return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) targetFileSystem);
+        } else {
+          LOGGER.error(
+              "Invalid file type. Lease recovery is not supported on filesystem with file: "
+                  + filePath);
+          return false;
+        }
+      default:
+        LOGGER.error("Invalid file type. Lease recovery is not supported on filesystem with file: "
+            + filePath);
+        return false;
+    }
+  }
+
+  /**
+   * Recovers lease on a file
+   *
+   * @param filePath
+   * @param path
+   * @param fs
+   * @return
+   * @throws IOException
+   */
+  private static boolean recoverLeaseOnFile(String filePath, Path path, DistributedFileSystem fs)
+      throws IOException {
+    DistributedFileSystem dfs = fs;
+    int maxAttempts = getLeaseRecoveryRetryCount();
+    int retryInterval = getLeaseRecoveryRetryInterval();
+    boolean leaseRecovered = false;
+    IOException ioException = null;
+    for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {
+      try {
+        leaseRecovered = dfs.recoverLease(path);
+        if (!leaseRecovered) {
+          try {
+            LOGGER.info(
+                "Failed to recover lease after attempt " + retryCount + " . Will try again after "
+                    + retryInterval + " ms...");
+            Thread.sleep(retryInterval);
+          } catch (InterruptedException e) {
+            LOGGER.error(e,
+                "Interrupted exception occurred while recovering lease for file : " + filePath);
+          }
+        }
+      } catch (IOException e) {
+        if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) {
+          LOGGER.error("The given file does not exist at path " + filePath);
+          throw e;
+        } else if (e instanceof FileNotFoundException) {
+          LOGGER.error("The given file does not exist at path " + filePath);
+          throw e;
+        } else {
+          LOGGER.error("Recover lease threw exception : " + e.getMessage());
+          ioException = e;
+        }
+      }
+      LOGGER.info("Retrying again after interval of " + retryInterval + " ms...");
+    }
+    if (leaseRecovered) {
+      LOGGER.info("Successfully able to recover lease on file: " + filePath);
+      return true;
+    } else {
+      LOGGER.error(
+          "Failed to recover lease on file: " + filePath + " after retrying for " + maxAttempts
+              + " at an interval of " + retryInterval);
+      if (null != ioException) {
+        throw ioException;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  private static int getLeaseRecoveryRetryCount() {
+    String retryMaxAttempts = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT,
+            CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+    int retryCount = 0;
+    try {
+      retryCount = Integer.parseInt(retryMaxAttempts);
+      if (retryCount < CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN
+          || retryCount > CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX) {
+        retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+        LOGGER.warn(
+            "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+                + " is not in allowed range. Allowed range is >="
+                + CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN + " and <="
+                + CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX + ". Therefore considering default value: "
+                + retryCount);
+      }
+    } catch (NumberFormatException ne) {
+      retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT);
+      LOGGER.warn("value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT
+          + " is incorrect. Therefore considering default value: " + retryCount);
+    }
+    return retryCount;
+  }
+
+  private static int getLeaseRecoveryRetryInterval() {
+    String retryMaxAttempts = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL,
+            CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+    int retryCount = 0;
+    try {
+      retryCount = Integer.parseInt(retryMaxAttempts);
+      if (retryCount < CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN
+          || retryCount > CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX) {
+        retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+        LOGGER.warn(
+            "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+                + " is not in allowed range. Allowed range is >="
+                + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + " and <="
+                + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX
+                + ". Therefore considering default value (ms): " + retryCount);
+      }
+    } catch (NumberFormatException ne) {
+      retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT);
+      LOGGER.warn(
+          "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL
+              + " is incorrect. Therefore considering default value (ms): " + retryCount);
+    }
+    return retryCount;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
index 9de41e1..64ff202 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.util.path.HDFSLeaseUtils;
 import org.apache.carbondata.format.ColumnDictionaryChunk;
 import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
 
@@ -359,7 +360,24 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter {
     // create thrift writer instance
     dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true);
     // open the file stream
-    dictionaryThriftWriter.open();
+    try {
+      dictionaryThriftWriter.open();
+    } catch (IOException e) {
+      // Cases to handle
+      // 1. Handle File lease recovery
+      if (HDFSLeaseUtils.checkExceptionMessageForLeaseRecovery(e.getMessage())) {
+        LOGGER.error(e, "Lease recovery exception encountered for file: " + dictionaryFile);
+        boolean leaseRecovered = HDFSLeaseUtils.recoverFileLease(dictionaryFile);
+        if (leaseRecovered) {
+          // try to open output stream again after recovering the lease on file
+          dictionaryThriftWriter.open();
+        } else {
+          throw e;
+        }
+      } else {
+        throw e;
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
index 9bf549d..d7b1a0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -136,7 +136,7 @@ public class ThriftWriter {
    */
   public void close() throws IOException {
     closeAtomicFileWriter();
-    CarbonUtil.closeStreams(dataOutputStream);
+    CarbonUtil.closeStream(dataOutputStream);
   }
 
   /**


[3/5] carbondata git commit: [CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata

Posted by ja...@apache.org.
[CARBONDATA-1271] Enhanced Performance for Hive Integration with Carbondata

This closes #1142


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cbe14197
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cbe14197
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cbe14197

Branch: refs/heads/encoding_override
Commit: cbe141976a53a558b84d6e31baf3ec54a9bc38cc
Parents: 285ce72
Author: Bhavya <bh...@knoldus.com>
Authored: Thu Jul 6 11:53:03 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 12 17:40:11 2017 +0800

----------------------------------------------------------------------
 .../core/stats/QueryStatisticsRecorderImpl.java |  80 +++---
 .../carbondata/hadoop/CarbonInputFormat.java    |   7 +-
 .../carbondata/hive/CarbonArrayInspector.java   |   4 -
 .../hive/CarbonDictionaryDecodeReadSupport.java | 288 +++++++++++++++++++
 .../carbondata/hive/CarbonHiveInputSplit.java   |  23 +-
 .../carbondata/hive/CarbonHiveRecordReader.java |  67 ++---
 .../apache/carbondata/hive/CarbonHiveSerDe.java |  36 +--
 .../hive/MapredCarbonInputFormat.java           | 129 ++++++---
 .../hive/server/HiveEmbeddedServer2.java        |   1 +
 9 files changed, 477 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
index f84a674..ffb7d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsRecorderImpl.java
@@ -101,45 +101,47 @@ public class QueryStatisticsRecorderImpl implements QueryStatisticsRecorder, Ser
     long scannedPages = 0;
     try {
       for (QueryStatistic statistic : queryStatistics) {
-        switch (statistic.getMessage()) {
-          case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
-            load_blocks_time += statistic.getTimeTaken();
-            break;
-          case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
-            scan_blocks_time += statistic.getCount();
-            break;
-          case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
-            scan_blocks_num += statistic.getCount();
-            break;
-          case QueryStatisticsConstants.LOAD_DICTIONARY:
-            load_dictionary_time += statistic.getTimeTaken();
-            break;
-          case QueryStatisticsConstants.RESULT_SIZE:
-            result_size += statistic.getCount();
-            break;
-          case QueryStatisticsConstants.EXECUTOR_PART:
-            total_executor_time += statistic.getTimeTaken();
-            break;
-          case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
-            total_blocklet = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
-            valid_scan_blocklet = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.VALID_PAGE_SCANNED:
-            valid_pages_blocklet = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
-            total_pages = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.READ_BLOCKlET_TIME:
-            readTime = statistic.getCount();
-            break;
-          case QueryStatisticsConstants.PAGE_SCANNED:
-            scannedPages = statistic.getCount();
-            break;
-          default:
-            break;
+        if (statistic.getMessage() != null) {
+          switch (statistic.getMessage()) {
+            case QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR:
+              load_blocks_time += statistic.getTimeTaken();
+              break;
+            case QueryStatisticsConstants.SCAN_BLOCKlET_TIME:
+              scan_blocks_time += statistic.getCount();
+              break;
+            case QueryStatisticsConstants.SCAN_BLOCKS_NUM:
+              scan_blocks_num += statistic.getCount();
+              break;
+            case QueryStatisticsConstants.LOAD_DICTIONARY:
+              load_dictionary_time += statistic.getTimeTaken();
+              break;
+            case QueryStatisticsConstants.RESULT_SIZE:
+              result_size += statistic.getCount();
+              break;
+            case QueryStatisticsConstants.EXECUTOR_PART:
+              total_executor_time += statistic.getTimeTaken();
+              break;
+            case QueryStatisticsConstants.TOTAL_BLOCKLET_NUM:
+              total_blocklet = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM:
+              valid_scan_blocklet = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.VALID_PAGE_SCANNED:
+              valid_pages_blocklet = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.TOTAL_PAGE_SCANNED:
+              total_pages = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.READ_BLOCKlET_TIME:
+              readTime = statistic.getCount();
+              break;
+            case QueryStatisticsConstants.PAGE_SCANNED:
+              scannedPages = statistic.getCount();
+              break;
+            default:
+              break;
+          }
         }
       }
       String headers =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1e69648..16b5d69 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -444,9 +444,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
           }
         }
       }
+
+      // For Hive integration if we have to get the stats we have to fetch hive.query.id
+      String query_id = job.getConfiguration().get("query.id") != null ?
+          job.getConfiguration().get("query.id") :
+          job.getConfiguration().get("hive.query.id");
       statistic
           .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
-      recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+      recorder.recordStatisticsForDriver(statistic, query_id);
       return resultFilterredBlocks;
     } finally {
       // clean up the access count for a segment as soon as its usage is complete so that in

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
index 49e068a..b26c959 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.hive;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -122,9 +121,6 @@ class CarbonArrayInspector implements SettableListObjectInspector {
 
       final Writable[] array = ((ArrayWritable) subObj).get();
       final List<Writable> list = Arrays.asList(array);
-
-      Collections.addAll(list, array);
-
       return list;
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
new file mode 100644
index 0000000..bc66d49
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+
+/**
+ *  This is the class to decode dictionary encoded column data back to its original value.
+ */
+public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
+
+  protected Dictionary[] dictionaries;
+
+  protected DataType[] dataTypes;
+  /**
+   * carbon columns
+   */
+  protected CarbonColumn[] carbonColumns;
+
+  protected Writable[] writableArr;
+
+  /**
+   * This initialization is done inside executor task
+   * for column dictionary involved in decoding.
+   *
+   * @param carbonColumns           column list
+   * @param absoluteTableIdentifier table identifier
+   */
+  @Override public void initialize(CarbonColumn[] carbonColumns,
+      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+    this.carbonColumns = carbonColumns;
+    dictionaries = new Dictionary[carbonColumns.length];
+    dataTypes = new DataType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
+          .hasEncoding(Encoding.DIRECT_DICTIONARY) && !carbonColumns[i].isComplex()) {
+        CacheProvider cacheProvider = CacheProvider.getInstance();
+        Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
+            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+        dataTypes[i] = carbonColumns[i].getDataType();
+        dictionaries[i] = forwardDictionaryCache.get(
+            new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier.getCarbonTableIdentifier(),
+                carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
+      } else {
+        dataTypes[i] = carbonColumns[i].getDataType();
+      }
+    }
+  }
+
+  @Override public T readRow(Object[] data) {
+    assert (data.length == dictionaries.length);
+    writableArr = new Writable[data.length];
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+      }
+      try {
+        writableArr[i] = createWritableObject(data[i], carbonColumns[i]);
+      } catch (IOException e) {
+        throw new RuntimeException(e.getMessage(), e);
+      }
+    }
+
+    return (T) writableArr;
+  }
+
+  /**
+   * to book keep the dictionary cache or update access count for each
+   * column involved during decode, to facilitate LRU cache policy if memory
+   * threshold is reached
+   */
+  @Override public void close() {
+    if (dictionaries == null) {
+      return;
+    }
+    for (int i = 0; i < dictionaries.length; i++) {
+      CarbonUtil.clearDictionaryCache(dictionaries[i]);
+    }
+  }
+
+  /**
+   * To Create the Writable from the CarbonData data
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private Writable createWritableObject(Object obj, CarbonColumn carbonColumn) throws IOException {
+    DataType dataType = carbonColumn.getDataType();
+    switch (dataType) {
+      case STRUCT:
+        return createStruct(obj, carbonColumn);
+      case ARRAY:
+        return createArray(obj, carbonColumn);
+      default:
+        return createWritablePrimitive(obj, carbonColumn);
+    }
+  }
+
+  /**
+   * Create Array Data for Array Datatype
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private ArrayWritable createArray(Object obj, CarbonColumn carbonColumn) throws IOException {
+    if (obj instanceof GenericArrayData) {
+      Object[] objArray = ((GenericArrayData) obj).array();
+      List<CarbonDimension> childCarbonDimensions = null;
+      CarbonDimension arrayDimension = null;
+      if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+        childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        arrayDimension = childCarbonDimensions.get(0);
+      }
+      List array = new ArrayList();
+      if (objArray != null) {
+        for (int i = 0; i < objArray.length; i++) {
+          Object curObj = objArray[i];
+          Writable newObj = createWritableObject(curObj, arrayDimension);
+          array.add(newObj);
+        }
+      }
+      if (array.size() > 0) {
+        ArrayWritable subArray = new ArrayWritable(Writable.class,
+            (Writable[]) array.toArray(new Writable[array.size()]));
+        return new ArrayWritable(Writable.class, new Writable[] { subArray });
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Create the Struct data for the Struct Datatype
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private ArrayWritable createStruct(Object obj, CarbonColumn carbonColumn) throws IOException {
+    if (obj instanceof GenericInternalRow) {
+      Object[] objArray = ((GenericInternalRow) obj).values();
+      List<CarbonDimension> childCarbonDimensions = null;
+      if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) {
+        childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      }
+      Writable[] arr = new Writable[objArray.length];
+      for (int i = 0; i < objArray.length; i++) {
+
+        arr[i] = createWritableObject(objArray[i], childCarbonDimensions.get(i));
+      }
+      return new ArrayWritable(Writable.class, arr);
+    }
+    throw new IOException("DataType not supported in Carbondata");
+  }
+
+  /**
+   * This method will create the Writable Objects for primitives.
+   *
+   * @param obj
+   * @param carbonColumn
+   * @return
+   * @throws IOException
+   */
+  private Writable createWritablePrimitive(Object obj, CarbonColumn carbonColumn)
+      throws IOException {
+    DataType dataType = carbonColumn.getDataType();
+    if (obj == null) {
+      return null;
+    }
+    switch (dataType) {
+      case NULL:
+        return null;
+      case DOUBLE:
+        return new DoubleWritable((double) obj);
+      case INT:
+        return new IntWritable((int) obj);
+      case LONG:
+        return new LongWritable((long) obj);
+      case SHORT:
+        return new ShortWritable((Short) obj);
+      case DATE:
+        return new DateWritable(new Date((Integer) obj));
+      case TIMESTAMP:
+        return new TimestampWritable(new Timestamp((long) obj));
+      case STRING:
+        return new Text(obj.toString());
+      case DECIMAL:
+        return new HiveDecimalWritable(
+            HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+    }
+    throw new IOException("Unknown primitive : " + dataType.getName());
+  }
+
+  /**
+   * If we need to use the same Writable[] then we can use this method
+   *
+   * @param writable
+   * @param obj
+   * @param carbonColumn
+   * @throws IOException
+   */
+  private void setPrimitive(Writable writable, Object obj, CarbonColumn carbonColumn)
+      throws IOException {
+    DataType dataType = carbonColumn.getDataType();
+    if (obj == null) {
+      writable.write(null);
+    }
+    switch (dataType) {
+      case DOUBLE:
+        ((DoubleWritable) writable).set((double) obj);
+        break;
+      case INT:
+        ((IntWritable) writable).set((int) obj);
+        break;
+      case LONG:
+        ((LongWritable) writable).set((long) obj);
+        break;
+      case SHORT:
+        ((ShortWritable) writable).set((short) obj);
+        break;
+      case DATE:
+        ((DateWritable) writable).set(new Date((Long) obj));
+        break;
+      case TIMESTAMP:
+        ((TimestampWritable) writable).set(new Timestamp((long) obj));
+        break;
+      case STRING:
+        ((Text) writable).set(obj.toString());
+        break;
+      case DECIMAL:
+        ((HiveDecimalWritable) writable)
+            .set(HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
index bfe4d27..b922295 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -113,8 +113,7 @@ public class CarbonHiveInputSplit extends FileSplit
   }
 
   public static CarbonHiveInputSplit from(String segmentId, FileSplit split,
-      ColumnarFormatVersion version)
-      throws IOException {
+      ColumnarFormatVersion version) throws IOException {
     return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
         split.getLocations(), version);
   }
@@ -151,8 +150,7 @@ public class CarbonHiveInputSplit extends FileSplit
     return segmentId;
   }
 
-  @Override
-  public void readFields(DataInput in) throws IOException {
+  @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.segmentId = in.readUTF();
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
@@ -162,10 +160,10 @@ public class CarbonHiveInputSplit extends FileSplit
     for (int i = 0; i < numInvalidSegment; i++) {
       invalidSegments.add(in.readUTF());
     }
+    this.numberOfBlocklets = in.readInt();
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
+  @Override public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeUTF(segmentId);
     out.writeShort(version.number());
@@ -174,6 +172,7 @@ public class CarbonHiveInputSplit extends FileSplit
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
     }
+    out.writeInt(numberOfBlocklets);
   }
 
   public List<String> getInvalidSegments() {
@@ -213,8 +212,7 @@ public class CarbonHiveInputSplit extends FileSplit
     return bucketId;
   }
 
-  @Override
-  public int compareTo(Distributable o) {
+  @Override public int compareTo(Distributable o) {
     if (o == null) {
       return -1;
     }
@@ -264,18 +262,15 @@ public class CarbonHiveInputSplit extends FileSplit
     return 0;
   }
 
-  @Override
-  public String getBlockPath() {
+  @Override public String getBlockPath() {
     return getPath().getName();
   }
 
-  @Override
-  public List<Long> getMatchedBlocklets() {
+  @Override public List<Long> getMatchedBlocklets() {
     return null;
   }
 
-  @Override
-  public boolean fullScan() {
+  @Override public boolean fullScan() {
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index e4df02e..2a92185 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -62,6 +62,8 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
 
   private ArrayWritable valueObj = null;
   private CarbonObjectInspector objInspector;
+  private long recordReaderCounter = 0;
+  private int[] columnIds;
 
   public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
       InputSplit inputSplit, JobConf jobConf) throws IOException {
@@ -88,17 +90,12 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     } catch (QueryExecutionException e) {
       throw new IOException(e.getMessage(), e.getCause());
     }
-    if (valueObj == null) {
-      valueObj =
-          new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
-    }
-
     final TypeInfo rowTypeInfo;
     final List<String> columnNames;
     List<TypeInfo> columnTypes;
     // Get column names and sort order
     final String colIds = conf.get("hive.io.file.readcolumn.ids");
-    final String columnNameProperty = conf.get("hive.io.file.readcolumn.names");
+    final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
 
     if (columnNameProperty.length() == 0) {
@@ -111,47 +108,39 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     } else {
       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
     }
+
+    if (valueObj == null) {
+      valueObj = new ArrayWritable(Writable.class, new Writable[columnTypes.size()]);
+    }
+
     if (!colIds.equals("")) {
       String[] arraySelectedColId = colIds.split(",");
       List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
-
-      for (String anArrayColId : arraySelectedColId) {
-        reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
+      columnIds = new int[arraySelectedColId.length];
+      int columnId = 0;
+      for (int j = 0; j < arraySelectedColId.length; j++) {
+        columnId = Integer.parseInt(arraySelectedColId[j]);
+        columnIds[j] = columnId;
       }
-      // Create row related objects
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-    } else {
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
     }
+
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
   }
 
   @Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
     if (carbonIterator.hasNext()) {
       Object obj = readSupport.readRow(carbonIterator.next());
-      ArrayWritable tmpValue;
-      try {
-        tmpValue = createArrayWritable(obj);
-      } catch (SerDeException se) {
-        throw new IOException(se.getMessage(), se.getCause());
-      }
-
-      if (value != tmpValue) {
-        final Writable[] arrValue = value.get();
-        final Writable[] arrCurrent = tmpValue.get();
-        if (valueObj != null && arrValue.length == arrCurrent.length) {
-          System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
-        } else {
-          if (arrValue.length != arrCurrent.length) {
-            throw new IOException(
-                "CarbonHiveInput : size of object differs. Value" + " size :  " + arrValue.length
-                    + ", Current Object size : " + arrCurrent.length);
-          } else {
-            throw new IOException("CarbonHiveInput can not support RecordReaders that"
-                + " don't return same key & value & value is null");
-          }
+      recordReaderCounter++;
+      Writable[] objArray = (Writable[]) obj;
+      Writable[] sysArray = new Writable[value.get().length];
+      if (columnIds != null && columnIds.length > 0 && objArray.length == columnIds.length) {
+        for (int i = 0; i < columnIds.length; i++) {
+          sysArray[columnIds[i]] = objArray[i];
         }
+        value.set(sysArray);
+      } else {
+        value.set(objArray);
       }
       return true;
     } else {
@@ -159,10 +148,6 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
     }
   }
 
-  private ArrayWritable createArrayWritable(Object obj) throws SerDeException {
-    return createStruct(obj, objInspector);
-  }
-
   @Override public Void createKey() {
     return null;
   }
@@ -172,7 +157,7 @@ class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
   }
 
   @Override public long getPos() throws IOException {
-    return 0;
+    return recordReaderCounter;
   }
 
   @Override public float getProgress() throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index f66f3ed..2980ad3 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -79,11 +79,9 @@ class CarbonHiveSerDe extends AbstractSerDe {
 
     final TypeInfo rowTypeInfo;
     final List<String> columnNames;
-    final List<String> reqColNames;
     final List<TypeInfo> columnTypes;
     // Get column names and sort order
     assert configuration != null;
-    final String colIds = configuration.get("hive.io.file.readcolumn.ids");
 
     final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -98,29 +96,17 @@ class CarbonHiveSerDe extends AbstractSerDe {
     } else {
       columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
     }
-    if (colIds != null && !colIds.equals("")) {
-      reqColNames = new ArrayList<String>();
-
-      String[] arraySelectedColId = colIds.split(",");
-      List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();
-      for (String anArrayColId : arraySelectedColId) {
-        reqColNames.add(columnNames.get(Integer.parseInt(anArrayColId)));
-        reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
-      }
-      // Create row related objects
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(reqColNames, reqColTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-    }
-    else {
-      // Create row related objects
-      rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
-      this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
-
-      // Stats part
-      serializedSize = 0;
-      deserializedSize = 0;
-      status = LAST_OPERATION.UNKNOWN;
-    }
+
+
+
+    // Create row related objects
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
+
+    // Stats part
+    serializedSize = 0;
+    deserializedSize = 0;
+    status = LAST_OPERATION.UNKNOWN;
   }
 
   @Override public Class<? extends Writable> getSerializedClass() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 7a1c9db..58f25c9 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -17,12 +17,12 @@
 package org.apache.carbondata.hive;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
@@ -31,8 +31,11 @@ import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.io.ArrayWritable;
@@ -42,9 +45,11 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
 
 public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
     implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {
+  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
 
   @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
     org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
@@ -63,47 +68,64 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
   @Override
   public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
       Reporter reporter) throws IOException {
-    QueryModel queryModel = getQueryModel(jobConf);
-    CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf);
+    String path = null;
+    if (inputSplit instanceof CarbonHiveInputSplit) {
+      path = ((CarbonHiveInputSplit) inputSplit).getPath().toString();
+    }
+    QueryModel queryModel = getQueryModel(jobConf, path);
+    CarbonReadSupport<ArrayWritable> readSupport = new CarbonDictionaryDecodeReadSupport<>();
     return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
   }
 
-  private QueryModel getQueryModel(Configuration configuration) throws IOException {
-    CarbonTable carbonTable = getCarbonTable(configuration);
+  /**
+   * this method will read the schema from the physical file and populate into CARBON_TABLE
+   *
+   * @param configuration
+   * @throws IOException
+   */
+  private static void populateCarbonTable(Configuration configuration, String paths)
+      throws IOException {
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    String validInputPath = null;
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    } else {
+      if (paths != null) {
+        for (String inputPath : inputPaths) {
+          if (paths.startsWith(inputPath)) {
+            validInputPath = inputPath;
+            break;
+          }
+        }
+      }
+    }
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.fromTablePath(validInputPath);
+    // read the schema file to get the absoluteTableIdentifier having the correct table id
+    // persisted in the schema
+    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+    setCarbonTable(configuration, carbonTable);
+  }
+
+  private static CarbonTable getCarbonTable(Configuration configuration, String path)
+      throws IOException {
+    populateCarbonTable(configuration, path);
+    // read it from schema file in the store
+    String carbonTableStr = configuration.get(CARBON_TABLE);
+    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+  }
+
+  private QueryModel getQueryModel(Configuration configuration, String path) throws IOException {
+    CarbonTable carbonTable = getCarbonTable(configuration, path);
     // getting the table absoluteTableIdentifier from the carbonTable
     // to avoid unnecessary deserialization
 
     StringBuilder colNames = new StringBuilder();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
 
-    // query plan includes projection column
-    String projection = getColumnProjection(configuration);
-    if (projection == null) {
-      projection = configuration.get("hive.io.file.readcolumn.names");
-    }
-    if (projection.equals("")) {
-      List<CarbonDimension> carbonDimensionList = carbonTable.getAllDimensions();
-      List<CarbonMeasure> carbonMeasureList = carbonTable.getAllMeasures();
-
-      for (CarbonDimension aCarbonDimensionList : carbonDimensionList) {
-        colNames = new StringBuilder((colNames + (aCarbonDimensionList.getColName())) + ",");
-      }
-      if (carbonMeasureList.size() < 1) {
-        colNames = new StringBuilder(colNames.substring(0, colNames.lastIndexOf(",")));
-      }
-      for (int index = 0; index < carbonMeasureList.size(); index++) {
-        if (!carbonMeasureList.get(index).getColName().equals("default_dummy_measure")) {
-          if (index == carbonMeasureList.size() - 1) {
-            colNames.append(carbonMeasureList.get(index).getColName());
-          } else {
-            colNames =
-                new StringBuilder((colNames + (carbonMeasureList.get(index).getColName())) + ",");
-          }
-        }
-      }
-      projection = colNames.toString().trim();
-      configuration.set("hive.io.file.readcolumn.names", colNames.toString());
-    }
+    String projection = getProjection(configuration, carbonTable,
+        identifier.getCarbonTableIdentifier().getTableName());
     CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
     QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
     // set the filter to the query model in order to filter blocklet before scan
@@ -115,6 +137,45 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
     return queryModel;
   }
 
+  /**
+   * Return the Projection for the CarbonQuery.
+   *
+   * @param configuration
+   * @param carbonTable
+   * @param tableName
+   * @return
+   */
+  private String getProjection(Configuration configuration, CarbonTable carbonTable,
+      String tableName) {
+    // query plan includes projection column
+    String projection = getColumnProjection(configuration);
+    if (projection == null) {
+      projection = configuration.get("hive.io.file.readcolumn.names");
+    }
+    List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(tableName);
+    List<String> carbonColumnNames = new ArrayList<>();
+    StringBuilder allColumns = new StringBuilder();
+    StringBuilder projectionColumns = new StringBuilder();
+    for (CarbonColumn column : carbonColumns) {
+      carbonColumnNames.add(column.getColName());
+      allColumns.append(column.getColName() + ",");
+    }
+
+    if (!projection.equals("")) {
+      String[] columnNames = projection.split(",");
+      //verify that the columns parsed by Hive exist in the table
+      for (String col : columnNames) {
+        //show columns command will return these data
+        if (carbonColumnNames.contains(col)) {
+          projectionColumns.append(col + ",");
+        }
+      }
+      return projectionColumns.substring(0, projectionColumns.lastIndexOf(","));
+    } else {
+      return allColumns.substring(0, allColumns.lastIndexOf(","));
+    }
+  }
+
   @Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cbe14197/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
index d8705f8..ae931fb 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
@@ -130,6 +130,7 @@ public class HiveEmbeddedServer2 {
     conf.set("hive.added.files.path", "");
     conf.set("hive.added.archives.path", "");
     conf.set("fs.default.name", "file:///");
+    conf.set(HiveConf.ConfVars.SUBMITLOCALTASKVIACHILD.varname, "false");
 
     // clear mapred.job.tracker - Hadoop defaults to 'local' if not defined. Hive however expects
     // this to be set to 'local' - if it's not, it does a remote execution (i.e. no child JVM)


[4/5] carbondata git commit: [CARBONDATA-1283] Carbon should continue with default value for wrong value in configured property

Posted by ja...@apache.org.
[CARBONDATA-1283] Carbon should continue with default value for wrong value in configured property

This closes #1155


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1a35cfb9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1a35cfb9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1a35cfb9

Branch: refs/heads/encoding_override
Commit: 1a35cfb90d0f4a4da05ec80f7a5c192f6832b36d
Parents: cbe1419
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Mon Jul 10 17:47:16 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Jul 12 19:28:44 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  14 ++
 .../core/datastore/impl/FileFactory.java        |   7 +-
 .../carbondata/core/locks/HdfsFileLock.java     |   5 +-
 .../carbondata/core/util/CarbonProperties.java  | 130 ++++++++++++++-
 .../apache/carbondata/core/util/CarbonUtil.java |  13 +-
 .../core/CarbonPropertiesValidationTest.java    | 164 +++++++++++++++++++
 6 files changed, 314 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8110abb..ccb6344 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -143,6 +143,11 @@ public final class CarbonCommonConstants {
    * VIEWFSURL_PREFIX
    */
   public static final String VIEWFSURL_PREFIX = "viewfs://";
+
+  /**
+   * ALLUXIO_PREFIX
+   */
+  public static final String ALLUXIOURL_PREFIX = "alluxio://";
   /**
    * FS_DEFAULT_FS
    */
@@ -329,6 +334,15 @@ public final class CarbonCommonConstants {
    */
   public static final String CSV_READ_BUFFER_SIZE_DEFAULT = "50000";
   /**
+   * min value for csv read buffer size
+   */
+  public static final int CSV_READ_BUFFER_SIZE_MIN = 10240; //10 kb
+  /**
+   * max value for csv read buffer size
+   */
+  public static final int CSV_READ_BUFFER_SIZE_MAX = 10485760; // 10 mb
+
+  /**
    * CSV_READ_COPIES
    */
   public static final String DEFAULT_NUMBER_CORES = "2";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 2a35ab3..2794470 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.filesystem.*;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -74,13 +75,13 @@ public final class FileFactory {
   }
 
   public static FileType getFileType(String path) {
-    if (path.startsWith(CarbonUtil.HDFS_PREFIX)) {
+    if (path.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
       return FileType.HDFS;
     }
-    else if (path.startsWith(CarbonUtil.ALLUXIO_PREFIX)) {
+    else if (path.startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
       return FileType.ALLUXIO;
     }
-    else if (path.startsWith(CarbonUtil.VIEWFS_PREFIX)) {
+    else if (path.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
       return FileType.VIEWFS;
     }
     return FileType.LOCAL;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index 94e7307..326f8ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -54,8 +54,9 @@ public class HdfsFileLock extends AbstractCarbonLock {
     // If can not get the STORE_LOCATION, then use hadoop.tmp.dir .
     tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION,
                System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
-    if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
-          && !tmpPath.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX)) {
+    if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) && !tmpPath
+        .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) && !tmpPath
+        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX)) {
       tmpPath = hdfsPath + tmpPath;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index c1e70ff..c9dd1ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -33,6 +33,8 @@ import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
+import org.apache.hadoop.conf.Configuration;
+
 public final class CarbonProperties {
   /**
    * Attribute for Carbon LOGGER.
@@ -98,6 +100,124 @@ public final class CarbonProperties {
     validateBlockletGroupSizeInMB();
     validateNumberOfColumnPerIORead();
     validateNumberOfRowsPerBlockletColumnPage();
+    validateEnableUnsafeSort();
+    validateCustomBlockDistribution();
+    validateEnableVectorReader();
+    validateLockType();
+    validateCarbonCSVReadBufferSizeByte();
+  }
+
+  private void validateCarbonCSVReadBufferSizeByte() {
+    String csvReadBufferSizeStr =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    if (null != csvReadBufferSizeStr) {
+      try {
+        int bufferSize = Integer.parseInt(csvReadBufferSizeStr);
+        if (bufferSize < CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MIN
+            || bufferSize > CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MAX) {
+          LOGGER.warn("The value \"" + csvReadBufferSizeStr + "\" configured for key "
+              + CarbonCommonConstants.CSV_READ_BUFFER_SIZE
+              + "\" is not in range. Valid range is (byte) \""
+              + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MIN + " to \""
+              + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_MAX + ". Using the default value \""
+              + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+          carbonProperties.setProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+              CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+        }
+      } catch (NumberFormatException nfe) {
+        LOGGER.warn("The value \"" + csvReadBufferSizeStr + "\" configured for key "
+            + CarbonCommonConstants.CSV_READ_BUFFER_SIZE
+            + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+        carbonProperties.setProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
+            CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT);
+      }
+    }
+  }
+
+  private void validateLockType() {
+    String lockTypeConfigured = carbonProperties.getProperty(CarbonCommonConstants.LOCK_TYPE);
+    if (null != lockTypeConfigured) {
+      switch (lockTypeConfigured.toUpperCase()) {
+        // if user is setting the lock type as CARBON_LOCK_TYPE_ZOOKEEPER then no need to validate
+        // else validate based on the file system type for LOCAL file system lock will be
+        // CARBON_LOCK_TYPE_LOCAL and for the distributed one CARBON_LOCK_TYPE_HDFS
+        case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
+          break;
+        case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
+        case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
+        default:
+          validateAndConfigureLockType(lockTypeConfigured);
+      }
+    } else {
+      validateAndConfigureLockType(lockTypeConfigured);
+    }
+  }
+
+  /**
+   * the method decide and set the lock type based on the configured system type
+   *
+   * @param lockTypeConfigured
+   */
+  private void validateAndConfigureLockType(String lockTypeConfigured) {
+    Configuration configuration = new Configuration(true);
+    String defaultFs = configuration.get("fs.defaultFS");
+    if (null != defaultFs && (defaultFs.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)
+        || defaultFs.startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || defaultFs
+        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX))
+        && !CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS.equalsIgnoreCase(lockTypeConfigured)) {
+      LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key "
+          + CarbonCommonConstants.LOCK_TYPE + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS);
+      carbonProperties.setProperty(CarbonCommonConstants.LOCK_TYPE,
+          CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS);
+    } else if (null != defaultFs && defaultFs.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX)
+        && !CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL.equalsIgnoreCase(lockTypeConfigured)) {
+      carbonProperties.setProperty(CarbonCommonConstants.LOCK_TYPE,
+          CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL);
+      LOGGER.warn("The value \"" + lockTypeConfigured + "\" configured for key "
+          + CarbonCommonConstants.LOCK_TYPE
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL);
+    }
+  }
+
+  private void validateEnableVectorReader() {
+    String vectorReaderStr =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER);
+    boolean isValidBooleanValue = CarbonUtil.validateBoolean(vectorReaderStr);
+    if (!isValidBooleanValue) {
+      LOGGER.warn("The enable vector reader value \"" + vectorReaderStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT);
+    }
+  }
+
+  private void validateCustomBlockDistribution() {
+    String customBlockDistributionStr =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
+    boolean isValidBooleanValue = CarbonUtil.validateBoolean(customBlockDistributionStr);
+    if (!isValidBooleanValue) {
+      LOGGER.warn("The custom block distribution value \"" + customBlockDistributionStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT);
+    }
+  }
+
+  private void validateEnableUnsafeSort() {
+    String unSafeSortStr = carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
+    boolean isValidBooleanValue = CarbonUtil.validateBoolean(unSafeSortStr);
+    if (!isValidBooleanValue) {
+      LOGGER.warn("The enable unsafe sort value \"" + unSafeSortStr
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT);
+      carbonProperties.setProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT);
+    }
   }
 
   private void initPropertySet() throws IllegalAccessException {
@@ -330,12 +450,10 @@ public final class CarbonProperties {
   }
 
   private void validateHighCardinalityIdentify() {
-    String highcardIdentifyStr = carbonProperties
-        .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
-            CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
-    try {
-      Boolean.parseBoolean(highcardIdentifyStr);
-    } catch (NumberFormatException e) {
+    String highcardIdentifyStr =
+        carbonProperties.getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE);
+    boolean validateBoolean = CarbonUtil.validateBoolean(highcardIdentifyStr);
+    if (!validateBoolean) {
       LOGGER.info("The high cardinality identify value \"" + highcardIdentifyStr
           + "\" is invalid. Using the default value \""
           + CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 06b2a61..1b08263 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -84,11 +84,6 @@ import org.apache.thrift.transport.TIOStreamTransport;
 
 public final class CarbonUtil {
 
-  public static final String HDFS_PREFIX = "hdfs://";
-  public static final String VIEWFS_PREFIX = "viewfs://";
-  public static final String ALLUXIO_PREFIX = "alluxio://";
-  private static final String FS_DEFAULT_FS = "fs.defaultFS";
-
   /**
    * Attribute for Carbon LOGGER
    */
@@ -697,7 +692,7 @@ public final class CarbonUtil {
    */
   public static String checkAndAppendHDFSUrl(String filePath) {
     String currentPath = filePath;
-    String defaultFsUrl = conf.get(FS_DEFAULT_FS);
+    String defaultFsUrl = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
     String baseDFSUrl = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_DDL_BASE_HDFS_URL, "");
     if (checkIfPrefixExists(filePath)) {
@@ -721,8 +716,10 @@ public final class CarbonUtil {
 
   private static boolean checkIfPrefixExists(String path) {
     final String lowerPath = path.toLowerCase();
-    return lowerPath.startsWith(HDFS_PREFIX) || lowerPath.startsWith(VIEWFS_PREFIX) || lowerPath
-        .startsWith("file://") || lowerPath.startsWith(ALLUXIO_PREFIX);
+    return lowerPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.VIEWFSURL_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX) || lowerPath
+        .startsWith(CarbonCommonConstants.ALLUXIOURL_PREFIX);
   }
 
   public static String getCarbonStorePath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1a35cfb9/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
new file mode 100644
index 0000000..e0262dc
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonProperty;
+
+import junit.framework.TestCase;
+import org.junit.Test;
+
+/**
+ * Method to test the carbon common constant configurations.
+ */
+public class CarbonPropertiesValidationTest extends TestCase {
+
+  CarbonProperties carbonProperties;
+
+  @Override public void setUp() throws Exception {
+    carbonProperties = CarbonProperties.getInstance();
+  }
+
+  @Test public void testvalidateLockType()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType = carbonProperties.getClass().getDeclaredMethod("validateLockType");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.LOCK_TYPE, "xyz");
+    String valueBeforeValidation = carbonProperties.getProperty(CarbonCommonConstants.LOCK_TYPE);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation = carbonProperties.getProperty(CarbonCommonConstants.LOCK_TYPE);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL.equalsIgnoreCase(valueAfterValidation));
+  }
+
+  @Test public void testValidateEnableUnsafeSort()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateEnableUnsafeSort");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "xyz");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+  }
+
+  @Test public void testValidateCustomBlockDistribution()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateCustomBlockDistribution");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "xyz");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT
+        .equalsIgnoreCase(valueAfterValidation));
+  }
+
+  @Test public void testValidateEnableVectorReader()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateEnableVectorReader");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "xyz");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(
+        CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+  }
+
+  @Test public void testValidateCarbonCSVReadBufferSizeByte()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateCarbonCSVReadBufferSizeByte");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "xyz");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(
+        CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+  }
+
+  @Test public void testValidateCarbonCSVReadBufferSizeByteRange()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateCarbonCSVReadBufferSizeByte");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "10485761");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(
+        CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+    carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "10240");
+    valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    validateMethodType.invoke(carbonProperties);
+    valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    assertTrue(valueBeforeValidation.equals(valueAfterValidation));
+    carbonProperties.addProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE, "10239");
+    valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    validateMethodType.invoke(carbonProperties);
+    valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.CSV_READ_BUFFER_SIZE);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(
+        CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT.equalsIgnoreCase(valueAfterValidation));
+  }
+
+  @Test public void testValidateHighCardinalityIdentify()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    Method validateMethodType =
+        carbonProperties.getClass().getDeclaredMethod("validateHighCardinalityIdentify");
+    validateMethodType.setAccessible(true);
+    carbonProperties.addProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE, "xyz");
+    String valueBeforeValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE);
+    validateMethodType.invoke(carbonProperties);
+    String valueAfterValidation =
+        carbonProperties.getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE);
+    assertTrue(!valueBeforeValidation.equals(valueAfterValidation));
+    assertTrue(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT
+        .equalsIgnoreCase(valueAfterValidation));
+  }
+}


[5/5] carbondata git commit: Update supported-data-types-in-carbondata.md

Posted by ja...@apache.org.
Update supported-data-types-in-carbondata.md

This closes #1165


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e4da2a6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e4da2a6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e4da2a6

Branch: refs/heads/encoding_override
Commit: 9e4da2a6caeef6acd637e49a29c70d2eedd4a504
Parents: 1a35cfb
Author: chenerlu <ch...@huawei.com>
Authored: Wed Jul 12 18:50:02 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed Jul 12 22:04:50 2017 +0800

----------------------------------------------------------------------
 docs/supported-data-types-in-carbondata.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e4da2a6/docs/supported-data-types-in-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/supported-data-types-in-carbondata.md b/docs/supported-data-types-in-carbondata.md
index 8f271e3..561248c 100644
--- a/docs/supported-data-types-in-carbondata.md
+++ b/docs/supported-data-types-in-carbondata.md
@@ -35,6 +35,7 @@
   * String Types
     * STRING
     * CHAR
+    * VARCHAR
 
   * Complex Types
     * arrays: ARRAY``<data_type>``