You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/29 14:35:30 UTC

[1/3] incubator-carbondata git commit: WIP provide dictionary server/client framework

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 241f45f8a -> 20a0b9ec5


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 38f8c79..6d9be67 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -23,7 +23,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
@@ -34,11 +36,15 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDime
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary;
 import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
@@ -116,8 +122,10 @@ public class PrimitiveDataType implements GenericDataType<Object> {
    * @param columnId
    */
   public PrimitiveDataType(String name, String parentname, String columnId,
-      CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier) {
+                           CarbonDimension carbonDimension,
+                           Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+                           CarbonTableIdentifier carbonTableIdentifier,
+                           DictionaryClient client, Boolean useOnePass, String storePath) {
     this.name = name;
     this.parentname = parentname;
     this.columnId = columnId;
@@ -130,8 +138,33 @@ public class PrimitiveDataType implements GenericDataType<Object> {
         dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(carbonDimension.getDataType()));
       } else {
-        Dictionary dictionary = cache.get(identifier);
-        dictionaryGenerator = new PreCreatedDictionary(dictionary);
+        Dictionary dictionary = null;
+        if (useOnePass) {
+          if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+            try {
+              dictionary = cache.get(identifier);
+            } catch (CarbonUtilException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          String threadNo = "initial";
+          DictionaryKey dictionaryKey = new DictionaryKey();
+          dictionaryKey.setColumnName(carbonDimension.getColName());
+          dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
+          dictionaryKey.setThreadNo(threadNo);
+          // for table initialization
+          dictionaryKey.setType("TABLE_INTIALIZATION");
+          dictionaryKey.setData("0");
+          client.getDictionary(dictionaryKey);
+          Map<Object, Integer> localCache = new HashMap<>();
+          // for generate dictionary
+          dictionaryKey.setType("DICTIONARY_GENERATION");
+          dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
+                  dictionaryKey, localCache);
+        } else {
+          dictionary = cache.get(identifier);
+          dictionaryGenerator = new PreCreatedDictionary(dictionary);
+        }
       }
     } catch (CarbonUtilException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 9e8f5b0..b7c17dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -134,6 +134,21 @@ public class CarbonLoadModel implements Serializable {
   private String rddIteratorKey;
 
   /**
+   *  Use one pass to generate dictionary
+   */
+  private boolean useOnePass;
+
+  /**
+   * dictionary server host
+   */
+  private String dictionaryServerHost;
+
+  /**
+   * dictionary sever port
+   */
+  private int dictionaryServerPort;
+
+  /**
    * get escape char
    * @return
    */
@@ -336,6 +351,9 @@ public class CarbonLoadModel implements Serializable {
     copy.commentChar = commentChar;
     copy.maxColumns = maxColumns;
     copy.storePath = storePath;
+    copy.useOnePass = useOnePass;
+    copy.dictionaryServerHost = dictionaryServerHost;
+    copy.dictionaryServerPort = dictionaryServerPort;
     return copy;
   }
 
@@ -380,6 +398,9 @@ public class CarbonLoadModel implements Serializable {
     copyObj.dateFormat = dateFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.storePath = storePath;
+    copyObj.useOnePass = useOnePass;
+    copyObj.dictionaryServerHost = dictionaryServerHost;
+    copyObj.dictionaryServerPort = dictionaryServerPort;
     return copyObj;
   }
 
@@ -609,4 +630,28 @@ public class CarbonLoadModel implements Serializable {
     this.rddIteratorKey = rddIteratorKey;
 
   }
+
+  public boolean getUseOnePass() {
+    return useOnePass;
+  }
+
+  public void setUseOnePass(boolean useOnePass) {
+    this.useOnePass = useOnePass;
+  }
+
+  public int getDictionaryServerPort() {
+    return dictionaryServerPort;
+  }
+
+  public void setDictionaryServerPort(int dictionaryServerPort) {
+    this.dictionaryServerPort = dictionaryServerPort;
+  }
+
+  public String getDictionaryServerHost() {
+    return dictionaryServerHost;
+  }
+
+  public void setDictionaryServerHost(String dictionaryServerHost) {
+    this.dictionaryServerHost = dictionaryServerHost;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 79e17e2..7450b1f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -41,6 +41,24 @@ public class CarbonDataLoadConfiguration {
 
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
+  /**
+   *  Use one pass to generate dictionary
+   */
+  private boolean useOnePass;
+
+  /**
+   * dictionary server host
+   */
+  private String dictionaryServerHost;
+
+  /**
+   * dictionary sever port
+   */
+  private int dictionaryServerPort;
+
+  public CarbonDataLoadConfiguration() {
+  }
+
   public int getDimensionCount() {
     int dimCount = 0;
     for (int i = 0; i < dataFields.length; i++) {
@@ -144,4 +162,28 @@ public class CarbonDataLoadConfiguration {
   public void setBucketingInfo(BucketingInfo bucketingInfo) {
     this.bucketingInfo = bucketingInfo;
   }
+
+  public boolean getUseOnePass() {
+    return useOnePass;
+  }
+
+  public void setUseOnePass(boolean useOnePass) {
+    this.useOnePass = useOnePass;
+  }
+
+  public String getDictionaryServerHost() {
+    return dictionaryServerHost;
+  }
+
+  public void setDictionaryServerHost(String dictionaryServerHost) {
+    this.dictionaryServerHost = dictionaryServerHost;
+  }
+
+  public int getDictionaryServerPort() {
+    return dictionaryServerPort;
+  }
+
+  public void setDictionaryServerPort(int dictionaryServerPort) {
+    this.dictionaryServerPort = dictionaryServerPort;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 63147c9..f87b4bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -43,6 +43,8 @@ import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * It builds the pipe line of steps for loading data to carbon.
  */
@@ -124,6 +126,10 @@ public final class DataLoadProcessBuilder {
           CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
       csvFileName = csvFile.getName();
       csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
+      // if firstRow = " ", then throw exception
+      if (StringUtils.isNotEmpty(csvHeader) && StringUtils.isBlank(csvHeader)) {
+        throw new CarbonDataLoadingException("First line of the csv is not valid.");
+      }
       configuration.setHeader(
           CarbonDataProcessorUtil.getColumnFields(csvHeader, loadModel.getCsvDelimiter()));
     }
@@ -194,6 +200,11 @@ public final class DataLoadProcessBuilder {
     }
     configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
     configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+    // configuration for one pass load: dictionary server info
+    configuration.setUseOnePass(loadModel.getUseOnePass());
+    configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
+    configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
+
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
index 3182a37..82a7bc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -19,22 +19,27 @@
 
 package org.apache.carbondata.processing.newflow.converter.impl;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.cache.dictionary.*;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -54,19 +59,48 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
 
   public DictionaryFieldConverterImpl(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index) {
+      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     this.index = index;
     this.carbonDimension = (CarbonDimension) dataField.getColumn();
     this.nullFormat = nullFormat;
     DictionaryColumnUniqueIdentifier identifier =
         new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
             dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
-    try {
-      Dictionary dictionary = cache.get(identifier);
-      dictionaryGenerator = new PreCreatedDictionary(dictionary);
-    } catch (CarbonUtilException e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
+
+    Dictionary dictionary = null;
+    // if use one pass, use DictionaryServerClientDictionary
+    if (useOnePass) {
+      if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+        try{
+          dictionary = cache.get(identifier);
+        } catch (CarbonUtilException e) {
+          LOGGER.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+      String threadNo = "initial";
+      DictionaryKey dictionaryKey = new DictionaryKey();
+      dictionaryKey.setColumnName(dataField.getColumn().getColName());
+      dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
+      dictionaryKey.setThreadNo(threadNo);
+      // for table initialization
+      dictionaryKey.setType("TABLE_INTIALIZATION");
+      dictionaryKey.setData("0");
+      client.getDictionary(dictionaryKey);
+      Map<Object, Integer> localCache = new HashMap<>();
+      // for generate dictionary
+      dictionaryKey.setType("DICTIONARY_GENERATION");
+      dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
+              dictionaryKey, localCache);
+    } else {
+      try {
+        dictionary = cache.get(identifier);
+        dictionaryGenerator = new PreCreatedDictionary(dictionary);
+      } catch (CarbonUtilException e) {
+        LOGGER.error(e);
+        throw new RuntimeException(e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index a46b9ba..b065b55 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -60,7 +61,8 @@ public class FieldEncoderFactory {
    */
   public FieldConverter createFieldEncoder(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat) {
+      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimesion()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -69,10 +71,11 @@ public class FieldEncoderFactory {
       } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
         return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
-            index);
+            index, client, useOnePass, storePath);
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(
-            createComplexType(dataField, cache, carbonTableIdentifier), index);
+            createComplexType(dataField, cache, carbonTableIdentifier,
+                    client, useOnePass, storePath), index);
       } else {
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
       }
@@ -86,9 +89,10 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier) {
+      CarbonTableIdentifier carbonTableIdentifier,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
-        carbonTableIdentifier);
+        carbonTableIdentifier, client, useOnePass, storePath);
   }
 
   /**
@@ -98,7 +102,8 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier) {
+      CarbonTableIdentifier carbonTableIdentifier,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     switch (carbonColumn.getDataType()) {
       case ARRAY:
         List<CarbonDimension> listOfChildDimensions =
@@ -108,7 +113,7 @@ public class FieldEncoderFactory {
             new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
         for (CarbonDimension dimension : listOfChildDimensions) {
           arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier));
+              carbonTableIdentifier, client, useOnePass, storePath));
         }
         return arrayDataType;
       case STRUCT:
@@ -119,7 +124,7 @@ public class FieldEncoderFactory {
             new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
         for (CarbonDimension dimension : dimensions) {
           structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier));
+              carbonTableIdentifier, client, useOnePass, storePath));
         }
         return structDataType;
       case MAP:
@@ -127,7 +132,7 @@ public class FieldEncoderFactory {
       default:
         return new PrimitiveDataType(carbonColumn.getColName(), parentName,
             carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
-            carbonTableIdentifier);
+            carbonTableIdentifier, client, useOnePass, storePath);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index e0a7ceb..e46b86a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -20,12 +20,14 @@ package org.apache.carbondata.processing.newflow.converter.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.*;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -53,6 +55,10 @@ public class RowConverterImpl implements RowConverter {
 
   private BadRecordLogHolder logHolder;
 
+  private DictionaryClient dictClient;
+
+  private ExecutorService executorService;
+
   public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
       BadRecordsLogger badRecordLogger) {
     this.fields = fields;
@@ -73,10 +79,41 @@ public class RowConverterImpl implements RowConverter {
 
     long lruCacheStartTime = System.currentTimeMillis();
 
+    // for one pass load, start the dictionary client
+    if (configuration.getUseOnePass()) {
+      executorService = Executors.newFixedThreadPool(1);
+      Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
+        @Override
+        public DictionaryClient call() throws Exception {
+          Thread.currentThread().setName("Dictionary client");
+          DictionaryClient dictionaryClient = new DictionaryClient();
+          dictionaryClient.startClient(configuration.getDictionaryServerHost(),
+                  configuration.getDictionaryServerPort());
+          return dictionaryClient;
+        }
+      });
+
+      try {
+        // wait for client initialization finished, or will raise null pointer exception
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      try {
+        dictClient = result.get();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException e) {
+        e.printStackTrace();
+      }
+    }
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat);
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
+              dictClient, configuration.getUseOnePass(),
+              configuration.getTableIdentifier().getStorePath());
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -116,6 +153,12 @@ public class RowConverterImpl implements RowConverter {
     }
     // Set the cardinality to configuration, it will be used by further step for mdk key.
     configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, cardinality);
+
+    // close dictionary client when finish write
+    if (configuration.getUseOnePass()) {
+      dictClient.shutDown();
+      executorService.shutdownNow();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
new file mode 100644
index 0000000..12c6e3b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.dictionary;
+
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+
+/**
+ * Dictionary implementation along with dictionary server client to get new dictionary values
+ */
+public class DictionaryServerClientDictionary implements BiDictionary<Integer, Object> {
+
+  private Dictionary dictionary;
+
+  private DictionaryClient client;
+
+  private Map<Object, Integer> localCache;
+
+  private DictionaryKey dictionaryKey;
+
+  private int base;
+
+  private Object lock = new Object();
+
+  public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
+      DictionaryKey key, Map<Object, Integer> localCache) {
+    this.dictionary = dictionary;
+    this.client = client;
+    this.dictionaryKey = key;
+    this.localCache = localCache;
+    this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
+  }
+
+  @Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
+    Integer key = getKey(value);
+    if (key == null) {
+      synchronized (lock) {
+        dictionaryKey.setData(value);
+        dictionaryKey.setThreadNo(Thread.currentThread().getId() + "");
+        DictionaryKey dictionaryValue = client.getDictionary(dictionaryKey);
+        key = (Integer) dictionaryValue.getData();
+        localCache.put(value, key);
+      }
+      return key + base;
+    }
+    return key;
+  }
+
+  @Override public Integer getKey(Object value) {
+    Integer key = -1;
+    if (dictionary != null) {
+      key = dictionary.getSurrogateKey(value.toString());
+    }
+    if (key == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+      key = localCache.get(value);
+      if (key != null) {
+        return key + base;
+      }
+    }
+    return key;
+  }
+
+  @Override public Object getValue(Integer key) {
+    throw new UnsupportedOperationException("Not supported here");
+  }
+
+  @Override public int size() {
+    dictionaryKey.setType("SIZE");
+    int size = (int) client.getDictionary(dictionaryKey).getData()
+            + base;
+    return size;
+  }
+}


[2/3] incubator-carbondata git commit: WIP provide dictionary server/client framework

Posted by ja...@apache.org.
WIP provide dictionary server/client framework

complete the netty communication

netty

implement methods, and use new dictionarygenerator

trigger msg: initial, generate, size

add msg: write dictionary

fix write mutil dictionary file

ObjectDecoder

fixerror

fix query error

remove no used files

shutdown dictionary server and client

fixDictServerCloseBug and format code

testOnePass

fix second load

fix complex

fix multil level complex

optimize DictionaryKey

fix testcase

fix testcase and get dict cache

 fix checkstyle

remove some useless judgement conditions

fix complex test case

fix testcase

fix mutil task

fix print

use json for serialize, and fix write dictionary issues

fix duplicate dictionary

write dictionary once

multi thread write dictionary

add testcase

RemoveUselessCode

fix comments

fix nullpointerexception

optimize

support predef column dictionary

use kryo


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/05b26549
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/05b26549
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/05b26549

Branch: refs/heads/master
Commit: 05b26549e929c2a461b0d18e5b58d3cd61bec1d5
Parents: 241f45f
Author: ravipesala <ra...@gmail.com>
Authored: Tue Nov 15 19:15:17 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Thu Dec 29 20:49:05 2016 +0800

----------------------------------------------------------------------
 .../AbstractColumnDictionaryInfo.java           |   2 +-
 .../dictionary/AbstractDictionaryCache.java     |  25 --
 .../cache/dictionary/ColumnDictionaryInfo.java  |   2 +-
 .../dictionary/ColumnReverseDictionaryInfo.java |   2 +-
 .../cache/dictionary/ForwardDictionary.java     |   4 +-
 .../cache/dictionary/ReverseDictionary.java     |   4 +-
 .../metadata/schema/table/CarbonTable.java      |  62 +++--
 .../core/constants/CarbonCommonConstants.java   |  10 +
 .../dictionary/client/DictionaryClient.java     |  92 +++++++
 .../client/DictionaryClientHandler.java         | 109 +++++++++
 .../dictionary/generator/DictionaryWriter.java  |  29 +++
 .../IncrementalColumnDictionaryGenerator.java   | 241 +++++++++++++++++++
 .../generator/ServerDictionaryGenerator.java    |  74 ++++++
 .../generator/TableDictionaryGenerator.java     | 116 +++++++++
 .../dictionary/generator/key/DictionaryKey.java |  92 +++++++
 .../dictionary/generator/key/KryoRegister.java  |  68 ++++++
 .../dictionary/server/DictionaryServer.java     |  93 +++++++
 .../server/DictionaryServerHandler.java         | 108 +++++++++
 .../apache/carbondata/core/util/CarbonUtil.java |  27 +++
 .../spark/util/GlobalDictionaryUtil.scala       |   4 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  15 ++
 .../execution/command/carbonTableSchema.scala   |  79 +++++-
 .../test/resources/columndictionary/country.csv |   5 +
 .../test/resources/columndictionary/name.csv    |  10 +
 .../spark/src/test/resources/dataIncrement.csv  |  21 ++
 .../complexType/TestCreateTableWithDouble.scala |   2 +
 .../dataload/TestLoadDataWithSinglePass.scala   | 114 +++++++++
 .../filterexpr/FilterProcessorTestCase.scala    |   2 +-
 .../processing/datatypes/PrimitiveDataType.java |  41 +++-
 .../processing/model/CarbonLoadModel.java       |  45 ++++
 .../newflow/CarbonDataLoadConfiguration.java    |  42 ++++
 .../newflow/DataLoadProcessBuilder.java         |  11 +
 .../impl/DictionaryFieldConverterImpl.java      |  52 +++-
 .../converter/impl/FieldEncoderFactory.java     |  23 +-
 .../converter/impl/RowConverterImpl.java        |  45 +++-
 .../DictionaryServerClientDictionary.java       |  95 ++++++++
 37 files changed, 1692 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
index ad766d7..ab400f0 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractColumnDictionaryInfo.java
@@ -282,7 +282,7 @@ public abstract class AbstractColumnDictionaryInfo implements DictionaryInfo {
    * 2. Filter scenarios where from value surrogate key has to be found.
    *
    * @param value dictionary value
-   * @return if found returns key else 0
+   * @return if found returns key else INVALID_SURROGATE_KEY
    */
   @Override public int getSurrogateKey(String value) {
     byte[] keyData = value.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
index e3e6532..5fbef5d 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/AbstractDictionaryCache.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReader;
 import org.apache.carbondata.core.service.DictionaryService;
 import org.apache.carbondata.core.service.PathService;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 
 /**
@@ -85,30 +84,6 @@ public abstract class AbstractDictionaryCache<K extends DictionaryColumnUniqueId
   }
 
   /**
-   * This method will check if dictionary and its metadata file exists for a given column
-   *
-   * @param dictionaryColumnUniqueIdentifier unique identifier which contains dbName,
-   *                                         tableName and columnIdentifier
-   * @return
-   */
-  protected boolean isFileExistsForGivenColumn(
-      DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) {
-    PathService pathService = CarbonCommonFactory.getPathService();
-    CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(carbonStorePath,
-            dictionaryColumnUniqueIdentifier.getCarbonTableIdentifier());
-
-    String dictionaryFilePath =
-        carbonTablePath.getDictionaryFilePath(dictionaryColumnUniqueIdentifier
-            .getColumnIdentifier().getColumnId());
-    String dictionaryMetadataFilePath =
-        carbonTablePath.getDictionaryMetaFilePath(dictionaryColumnUniqueIdentifier
-            .getColumnIdentifier().getColumnId());
-    // check if both dictionary and its metadata file exists for a given column
-    return CarbonUtil.isFileExists(dictionaryFilePath) && CarbonUtil
-        .isFileExists(dictionaryMetadataFilePath);
-  }
-
-  /**
    * This method will read dictionary metadata file and return the dictionary meta chunks
    *
    * @param dictionaryColumnUniqueIdentifier

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
index 1d2eb8b..cd2a88c 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
@@ -211,7 +211,7 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
         return surrogateKey; // key found
       }
     }
-    return 0;
+    return CarbonCommonConstants.INVALID_SURROGATE_KEY;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnReverseDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnReverseDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnReverseDictionaryInfo.java
index 03a6d6a..ff4f10a 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnReverseDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnReverseDictionaryInfo.java
@@ -64,7 +64,7 @@ public class ColumnReverseDictionaryInfo extends AbstractColumnDictionaryInfo {
    * 2. Filter scenarios where from value surrogate key has to be found.
    *
    * @param value dictionary value as byte array. It will be treated as key here
-   * @return if found returns key else 0
+   * @return if found returns key else INVALID_SURROGATE_KEY
    */
   @Override public int getSurrogateKey(byte[] value) {
     DictionaryByteArrayWrapper dictionaryByteArrayWrapper =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
index bddab56..7d460d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionary.java
@@ -50,7 +50,7 @@ public class ForwardDictionary implements Dictionary {
    * 2. Filter scenarios where from value surrogate key has to be found.
    *
    * @param value dictionary value
-   * @return if found returns key else 0
+   * @return if found returns key else INVALID_SURROGATE_KEY
    */
   @Override public int getSurrogateKey(String value) {
     return columnDictionaryInfo.getSurrogateKey(value);
@@ -64,7 +64,7 @@ public class ForwardDictionary implements Dictionary {
    * 2. Filter scenarios where from value surrogate key has to be found.
    *
    * @param value dictionary value as byte array
-   * @return if found returns key else 0
+   * @return if found returns key else INVALID_SURROGATE_KEY
    */
   @Override public int getSurrogateKey(byte[] value) {
     return columnDictionaryInfo.getSurrogateKey(value);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
index 1f8a3ff..cce4156 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionary.java
@@ -44,7 +44,7 @@ public class ReverseDictionary implements Dictionary {
    * 2. Filter scenarios where from value surrogate key has to be found.
    *
    * @param value dictionary value
-   * @return if found returns key else 0
+   * @return if found returns key else INVALID_SURROGATE_KEY
    */
   @Override public int getSurrogateKey(String value) {
     return columnReverseDictionaryInfo.getSurrogateKey(value);
@@ -58,7 +58,7 @@ public class ReverseDictionary implements Dictionary {
    * 2. Filter scenarios where from value surrogate key has to be found.
    *
    * @param value dictionary value as byte array
-   * @return if found returns key else 0
+   * @return if found returns key else INVALID_SURROGATE_KEY
    */
   @Override public int getSurrogateKey(byte[] value) {
     return columnReverseDictionaryInfo.getSurrogateKey(value);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
index 7766616..f756ab1 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java
@@ -68,6 +68,11 @@ public class CarbonTable implements Serializable {
 
   private Map<String, List<CarbonColumn>> createOrderColumn;
   /**
+   * TableName, Dimensions and children dimensions list
+   */
+  private Map<String, List<CarbonDimension>> tablePrimitiveDimensionsMap;
+
+  /**
    * table measures list.
    */
   private Map<String, List<CarbonMeasure>> tableMeasuresMap;
@@ -108,6 +113,7 @@ public class CarbonTable implements Serializable {
     this.tableBucketMap = new HashMap<>();
     this.aggregateTablesName = new ArrayList<String>();
     this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
+    this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
   }
 
   /**
@@ -195,8 +201,10 @@ public class CarbonTable implements Serializable {
    */
   private void fillDimensionsAndMeasuresForTables(TableSchema tableSchema) {
     List<CarbonDimension> dimensions = new ArrayList<CarbonDimension>();
+    List<CarbonDimension> primitiveDimensions = new ArrayList<CarbonDimension>();
     List<CarbonMeasure> measures = new ArrayList<CarbonMeasure>();
     this.tableDimensionsMap.put(tableSchema.getTableName(), dimensions);
+    this.tablePrimitiveDimensionsMap.put(this.tableUniqueName, primitiveDimensions);
     this.tableMeasuresMap.put(tableSchema.getTableName(), measures);
     int dimensionOrdinal = 0;
     int measureOrdinal = 0;
@@ -216,26 +224,32 @@ public class CarbonTable implements Serializable {
           dimensions.add(complexDimension);
           dimensionOrdinal =
               readAllComplexTypeChildrens(dimensionOrdinal, columnSchema.getNumberOfChild(),
-                  listOfColumns, complexDimension);
+                  listOfColumns, complexDimension, primitiveDimensions);
           i = dimensionOrdinal - 1;
           complexTypeOrdinal = assignComplexOrdinal(complexDimension, complexTypeOrdinal);
         } else {
           if (!columnSchema.getEncodingList().contains(Encoding.DICTIONARY)) {
-            dimensions.add(new CarbonDimension(columnSchema, dimensionOrdinal++,
-                   columnSchema.getSchemaOrdinal(), -1, -1, -1));
+            CarbonDimension dimension =
+                    new CarbonDimension(columnSchema, dimensionOrdinal++,
+                            columnSchema.getSchemaOrdinal(), -1, -1, -1);
+            dimensions.add(dimension);
+            primitiveDimensions.add(dimension);
           } else if (columnSchema.getEncodingList().contains(Encoding.DICTIONARY)
               && columnSchema.getColumnGroupId() == -1) {
-            dimensions
-                .add(new CarbonDimension(columnSchema, dimensionOrdinal++,
-                         columnSchema.getSchemaOrdinal(), keyOrdinal++, -1, -1));
+            CarbonDimension dimension =
+                    new CarbonDimension(columnSchema, dimensionOrdinal++,
+                            columnSchema.getSchemaOrdinal(), keyOrdinal++, -1, -1);
+            dimensions.add(dimension);
+            primitiveDimensions.add(dimension);
           } else {
             columnGroupOrdinal =
                 previousColumnGroupId == columnSchema.getColumnGroupId() ? ++columnGroupOrdinal : 0;
             previousColumnGroupId = columnSchema.getColumnGroupId();
-            dimensions.add(new CarbonDimension(columnSchema, dimensionOrdinal++,
-                     columnSchema.getSchemaOrdinal(), keyOrdinal++,
-                columnGroupOrdinal, -1));
-
+            CarbonDimension dimension = new CarbonDimension(columnSchema, dimensionOrdinal++,
+                    columnSchema.getSchemaOrdinal(), keyOrdinal++,
+                    columnGroupOrdinal, -1);
+            dimensions.add(dimension);
+            primitiveDimensions.add(dimension);
           }
         }
       } else {
@@ -256,7 +270,8 @@ public class CarbonTable implements Serializable {
    * @return
    */
   private int readAllComplexTypeChildrens(int dimensionOrdinal, int childCount,
-      List<ColumnSchema> listOfColumns, CarbonDimension parentDimension) {
+      List<ColumnSchema> listOfColumns, CarbonDimension parentDimension,
+                                          List<CarbonDimension> primitiveDimensions) {
     for (int i = 0; i < childCount; i++) {
       ColumnSchema columnSchema = listOfColumns.get(dimensionOrdinal);
       if (columnSchema.isDimensionColumn()) {
@@ -268,11 +283,13 @@ public class CarbonTable implements Serializable {
           parentDimension.getListOfChildDimensions().add(complexDimension);
           dimensionOrdinal =
               readAllComplexTypeChildrens(dimensionOrdinal, columnSchema.getNumberOfChild(),
-                  listOfColumns, complexDimension);
+                  listOfColumns, complexDimension, primitiveDimensions);
         } else {
-          parentDimension.getListOfChildDimensions()
-              .add(new CarbonDimension(columnSchema, dimensionOrdinal++,
-                     columnSchema.getSchemaOrdinal(), -1, -1, -1));
+          CarbonDimension carbonDimension =
+                  new CarbonDimension(columnSchema, dimensionOrdinal++,
+                          columnSchema.getSchemaOrdinal(), -1, -1, -1);
+          parentDimension.getListOfChildDimensions().add(carbonDimension);
+          primitiveDimensions.add(carbonDimension);
         }
       }
     }
@@ -518,4 +535,19 @@ public class CarbonTable implements Serializable {
     this.blockSize = blockSize;
   }
 
+  /**
+   * to get the normal dimension or the primitive dimension of the complex type
+   *
+   * @param tableName
+   * @return primitive dimension of a table
+   */
+  public CarbonDimension getPrimitiveDimensionByName(String tableName, String columnName) {
+    List<CarbonDimension> dimList = tablePrimitiveDimensionsMap.get(tableName);
+    for (CarbonDimension dim : dimList) {
+      if (dim.getColName().equalsIgnoreCase(columnName)) {
+        return dim;
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2d1ba9f..f766042 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -981,6 +981,16 @@ public final class CarbonCommonConstants {
 
   public static final String ENABLE_VECTOR_READER_DEFAULT = "false";
 
+  /*
+   * carbon dictionary server port
+   */
+  public static final String DICTIONARY_SERVER_PORT = "carbon.dictionary.server.port";
+
+  /**
+   * Default carbon dictionary server port
+   */
+  public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
new file mode 100644
index 0000000..92bc56a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.client;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.serialization.ClassResolvers;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+
+
+/**
+ * Dictionary client to connect to Dictionary server and generate dictionary values
+ */
+public class DictionaryClient {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(DictionaryClient.class.getName());
+
+  private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();
+
+  private ClientBootstrap clientBootstrap;
+
+  /**
+   * start dictionary client
+   *
+   * @param address
+   * @param port
+   */
+  public void startClient(String address, int port) {
+    clientBootstrap = new ClientBootstrap();
+    ExecutorService boss = Executors.newCachedThreadPool();
+    ExecutorService worker = Executors.newCachedThreadPool();
+    clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
+    clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline pipeline = Channels.pipeline();
+        pipeline.addLast("ObjectEncoder", new ObjectEncoder());
+        pipeline.addLast("ObjectDecoder", new ObjectDecoder(ClassResolvers.cacheDisabled(
+            getClass().getClassLoader())));
+        pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
+        return pipeline;
+      }
+    });
+    clientBootstrap.connect(new InetSocketAddress(address, port));
+    LOGGER.audit("Client Start!");
+  }
+
+  /**
+   * for client request
+   *
+   * @param key
+   * @return
+   */
+  public DictionaryKey getDictionary(DictionaryKey key) {
+    return dictionaryClientHandler.getDictionary(key);
+  }
+
+  /**
+   * shutdown dictionary client
+   */
+  public void shutDown() {
+    clientBootstrap.releaseExternalResources();
+    clientBootstrap.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
new file mode 100644
index 0000000..d5ca781
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.client;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
+
+import org.jboss.netty.channel.*;
+
+/**
+ * Client handler to get data.
+ */
+public class DictionaryClientHandler extends SimpleChannelHandler {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
+
+  final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>();
+
+  private ChannelHandlerContext ctx;
+
+  private Object lock = new Object();
+
+  @Override
+  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+    this.ctx = ctx;
+    LOGGER.audit("Connected " + ctx.getHandler());
+    super.channelConnected(ctx, e);
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+    byte[] response = (byte[]) e.getMessage();
+    DictionaryKey key = KryoRegister.deserialize(response);
+    BlockingQueue<DictionaryKey> dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
+    dictKeyQueue.offer(key);
+    super.messageReceived(ctx, e);
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+    LOGGER.error("exceptionCaught");
+    e.getCause().printStackTrace();
+    ctx.getChannel().close();
+  }
+
+  /**
+   * client send request to server
+   *
+   * @param key
+   * @return
+   */
+  public DictionaryKey getDictionary(DictionaryKey key) {
+    DictionaryKey dictionaryKey;
+    BlockingQueue<DictionaryKey> dictKeyQueue = null;
+    try {
+      synchronized (lock) {
+        dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
+        if (dictKeyQueue == null) {
+          dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
+          dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
+        }
+      }
+      byte[] serialize = KryoRegister.serialize(key);
+      ctx.getChannel().write(serialize);
+    } catch (Exception e) {
+      LOGGER.error("Error while send request to server " + e.getMessage());
+      ctx.getChannel().close();
+    }
+    boolean interrupted = false;
+    try {
+      for (; ; ) {
+        try {
+          dictionaryKey = dictKeyQueue.take();
+          return dictionaryKey;
+        } catch (InterruptedException ignore) {
+          interrupted = true;
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/generator/DictionaryWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/DictionaryWriter.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/DictionaryWriter.java
new file mode 100644
index 0000000..8c160dc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/DictionaryWriter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator;
+
+
+/**
+ * Interface to write dictionary to file system
+ */
+public interface DictionaryWriter {
+
+  void writeDictionaryData(String tableUniqueName) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
new file mode 100644
index 0000000..fe84ef5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnIdentifier;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.devapi.DictionaryGenerator;
+import org.apache.carbondata.core.service.DictionaryService;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
+import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
+
+
+/**
+ * This generator does not maintain the whole cache of dictionary. It just maintains the cache only
+ * for the loading session, so what ever the dictionary values it generates in the loading session
+ * it keeps in cache.
+ */
+public class IncrementalColumnDictionaryGenerator implements BiDictionary<Integer, String>,
+        DictionaryGenerator<Integer, String>, DictionaryWriter {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(IncrementalColumnDictionaryGenerator.class.getName());
+
+  private final Object lock = new Object();
+
+  private Map<String, Integer> incrementalCache = new ConcurrentHashMap<>();
+
+  private Map<Integer, String> reverseIncrementalCache = new ConcurrentHashMap<>();
+
+  private int currentDictionarySize;
+
+  private CarbonDimension dimension;
+
+  public IncrementalColumnDictionaryGenerator(CarbonDimension dimension, int maxValue) {
+    this.currentDictionarySize = maxValue;
+    this.dimension = dimension;
+  }
+
+  @Override public Integer getOrGenerateKey(String value) throws DictionaryGenerationException {
+    Integer dict = getKey(value);
+    if (dict == null) {
+      dict = generateKey(value);
+    }
+    return dict;
+  }
+
+  @Override public Integer getKey(String value) {
+    return incrementalCache.get(value);
+  }
+
+  @Override public String getValue(Integer key) {
+    return reverseIncrementalCache.get(key);
+  }
+
+  @Override public int size() {
+    return currentDictionarySize;
+  }
+
+  @Override public Integer generateKey(String value) throws DictionaryGenerationException {
+    synchronized (lock) {
+      Integer dict = incrementalCache.get(value);
+      if (dict == null) {
+        dict = ++currentDictionarySize;
+        incrementalCache.put(value, dict);
+        reverseIncrementalCache.put(dict, value);
+      }
+      return dict;
+    }
+  }
+
+  @Override public void writeDictionaryData(String tableUniqueName) throws Exception {
+    // initialize params
+    CarbonMetadata metadata = CarbonMetadata.getInstance();
+    CarbonTable carbonTable = metadata.getCarbonTable(tableUniqueName);
+    CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier();
+    ColumnIdentifier columnIdentifier = dimension.getColumnIdentifier();
+    String storePath = carbonTable.getStorePath();
+    DictionaryService dictionaryService = CarbonCommonFactory.getDictionaryService();
+    // create dictionary cache from dictionary File
+    DictionaryColumnUniqueIdentifier identifier =
+            new DictionaryColumnUniqueIdentifier(tableIdentifier, columnIdentifier,
+                    columnIdentifier.getDataType());
+    Boolean isDictExists = CarbonUtil.isFileExistsForGivenColumn(storePath, identifier);
+    Dictionary dictionary = null;
+    long t1 = System.currentTimeMillis();
+    if (isDictExists) {
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance()
+              .createCache(CacheType.REVERSE_DICTIONARY, storePath);
+      dictionary = dictCache.get(identifier);
+    }
+    long dictCacheTime = System.currentTimeMillis() - t1;
+    long t2 = System.currentTimeMillis();
+    // write dictionary
+    CarbonDictionaryWriter dictionaryWriter = null;
+    dictionaryWriter = dictionaryService
+            .getDictionaryWriter(tableIdentifier, columnIdentifier, storePath);
+    List<String> distinctValues = writeDictionary(dictionaryWriter, isDictExists);
+    long dictWriteTime = System.currentTimeMillis() - t2;
+    long t3 = System.currentTimeMillis();
+    // write sort index
+    if (distinctValues.size() > 0) {
+      writeSortIndex(distinctValues, dictionary,
+              dictionaryService, tableIdentifier, columnIdentifier, storePath);
+    }
+    long sortIndexWriteTime = System.currentTimeMillis() - t3;
+    // update Meta Data
+    updateMetaData(dictionaryWriter);
+    LOGGER.audit("\n columnName: " + dimension.getColName() +
+            "\n columnId: " + dimension.getColumnId() +
+            "\n new distinct values count: " + distinctValues.size() +
+            "\n create dictionary cache: " + dictCacheTime +
+            "\n sort list, distinct and write: " + dictWriteTime +
+            "\n write sort info: " + sortIndexWriteTime);
+  }
+
+  /**
+   * write dictionary to file
+   *
+   * @param dictionaryWriter
+   * @param isDictExists
+   * @return
+   * @throws IOException
+   */
+  private List<String> writeDictionary(CarbonDictionaryWriter dictionaryWriter,
+                                       Boolean isDictExists) throws IOException {
+    List<String> distinctValues = new ArrayList<>();
+    try {
+      if (!isDictExists) {
+        dictionaryWriter.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
+        distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
+      }
+      // write value to dictionary file
+      if (reverseIncrementalCache.size() > 0) {
+        for (int index = 2; index < reverseIncrementalCache.size() + 2; index++) {
+          String value = reverseIncrementalCache.get(index);
+          String parsedValue = DataTypeUtil
+                  .normalizeColumnValueForItsDataType(value, dimension);
+          if (null != parsedValue) {
+            dictionaryWriter.write(parsedValue);
+            distinctValues.add(parsedValue);
+          }
+        }
+      }
+    } catch (IOException ex) {
+      throw ex;
+    } finally {
+      if (null != dictionaryWriter) {
+        dictionaryWriter.close();
+      }
+    }
+
+    return distinctValues;
+  }
+
+  /**
+   * write dictionary sort index to file
+   *
+   * @param distinctValues
+   * @param dictionary
+   * @param dictionaryService
+   * @param tableIdentifier
+   * @param columnIdentifier
+   * @param storePath
+   * @throws IOException
+   */
+  private void writeSortIndex(List<String> distinctValues,
+                              Dictionary dictionary,
+                              DictionaryService dictionaryService,
+                              CarbonTableIdentifier tableIdentifier,
+                              ColumnIdentifier columnIdentifier,
+                              String storePath) throws Exception{
+    CarbonDictionarySortIndexWriter carbonDictionarySortIndexWriter = null;
+    try {
+      CarbonDictionarySortInfoPreparator preparator = new CarbonDictionarySortInfoPreparator();
+      CarbonDictionarySortInfo dictionarySortInfo =
+              preparator.getDictionarySortInfo(distinctValues, dictionary,
+                      dimension.getDataType());
+      carbonDictionarySortIndexWriter =
+              dictionaryService.getDictionarySortIndexWriter(tableIdentifier, columnIdentifier,
+                      storePath);
+      carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex());
+      carbonDictionarySortIndexWriter
+              .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted());
+    } finally {
+      if (null != carbonDictionarySortIndexWriter) {
+        carbonDictionarySortIndexWriter.close();
+      }
+    }
+  }
+
+  /**
+   * update dictionary metadata
+   *
+   * @param dictionaryWriter
+   * @throws IOException
+   */
+  private void updateMetaData(CarbonDictionaryWriter dictionaryWriter) throws IOException{
+    if (null != dictionaryWriter) {
+      dictionaryWriter.commit();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
new file mode 100644
index 0000000..594ec41
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.devapi.DictionaryGenerator;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+
+/**
+ * This is the dictionary generator for all tables. It generates dictionary
+ * based on @{@link DictionaryKey}.
+ */
+public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, DictionaryKey> {
+
+  /**
+   * the map of tableName to TableDictionaryGenerator
+   */
+  private Map<String, TableDictionaryGenerator> tableMap = new ConcurrentHashMap<>();
+
+  @Override public Integer generateKey(DictionaryKey value) throws DictionaryGenerationException {
+    TableDictionaryGenerator generator = tableMap.get(value.getTableUniqueName());
+    assert generator != null : "Table initialization for generator is not done";
+    return generator.generateKey(value);
+  }
+
+  public void initializeGeneratorForTable(DictionaryKey key) {
+    CarbonMetadata metadata = CarbonMetadata.getInstance();
+    CarbonTable carbonTable = metadata.getCarbonTable(key.getTableUniqueName());
+    CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
+            key.getTableUniqueName(), key.getColumnName());
+    // initialize TableDictionaryGenerator first
+    if (tableMap.get(key.getTableUniqueName()) == null) {
+      tableMap.put(key.getTableUniqueName(), new TableDictionaryGenerator(dimension));
+    } else {
+      tableMap.get(key.getTableUniqueName()).updateGenerator(dimension);
+    }
+  }
+
+  public Integer size(DictionaryKey key) {
+    TableDictionaryGenerator generator = tableMap.get(key.getTableUniqueName());
+    assert generator != null : "Table intialization for generator is not done";
+    return generator.size(key);
+  }
+
+  public void writeDictionaryData() throws Exception {
+    for (String tableUniqueName: tableMap.keySet()) {
+      TableDictionaryGenerator generator = tableMap.get(tableUniqueName);
+      ((DictionaryWriter) generator).writeDictionaryData(tableUniqueName);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
new file mode 100644
index 0000000..addc3b6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.devapi.DictionaryGenerator;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Dictionary generation for table.
+ */
+public class TableDictionaryGenerator
+    implements DictionaryGenerator<Integer, DictionaryKey>, DictionaryWriter {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(TableDictionaryGenerator.class.getName());
+
+  /**
+   * the map of columnName to dictionaryGenerator
+   */
+  private Map<String, DictionaryGenerator<Integer, String>> columnMap = new ConcurrentHashMap<>();
+
+  public TableDictionaryGenerator(CarbonDimension dimension) {
+    columnMap.put(dimension.getColumnId(),
+            new IncrementalColumnDictionaryGenerator(dimension, 1));
+  }
+
+  @Override public Integer generateKey(DictionaryKey value) throws DictionaryGenerationException {
+    CarbonMetadata metadata = CarbonMetadata.getInstance();
+    CarbonTable carbonTable = metadata.getCarbonTable(value.getTableUniqueName());
+    CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
+            value.getTableUniqueName(), value.getColumnName());
+
+    DictionaryGenerator<Integer, String> generator =
+            columnMap.get(dimension.getColumnId());
+    return generator.generateKey(value.getData().toString());
+  }
+
+  public Integer size(DictionaryKey key) {
+    CarbonMetadata metadata = CarbonMetadata.getInstance();
+    CarbonTable carbonTable = metadata.getCarbonTable(key.getTableUniqueName());
+    CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
+            key.getTableUniqueName(), key.getColumnName());
+
+    DictionaryGenerator<Integer, String> generator =
+            columnMap.get(dimension.getColumnId());
+    return ((BiDictionary) generator).size();
+  }
+
+  @Override public void writeDictionaryData(String tableUniqueName) throws Exception {
+    int numOfCores = 1;
+    final String tableName = tableUniqueName;
+    try {
+      numOfCores = Integer.parseInt(CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+                      CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+    } catch (NumberFormatException e) {
+      numOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+    }
+    long start = System.currentTimeMillis();
+    List<Future<Void>> taskSubmitList =
+            new ArrayList<>(columnMap.size());
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfCores);
+    for (final DictionaryGenerator generator: columnMap.values()) {
+      taskSubmitList.add(executorService.submit(new Callable<Void>() {
+        @Override public Void call() throws Exception {
+          ((DictionaryWriter) (generator)).writeDictionaryData(tableName);
+          return null;
+        }
+      }));
+    }
+
+    try {
+      executorService.shutdown();
+      executorService.awaitTermination(1, TimeUnit.HOURS);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error loading the dictionary: " + e.getMessage());
+    }
+    LOGGER.audit("Total time taken to write dictionary file is: " +
+            (System.currentTimeMillis() - start));
+  }
+
+  public void updateGenerator(CarbonDimension dimension) {
+    columnMap.put(dimension.getColumnId(),
+            new IncrementalColumnDictionaryGenerator(dimension, 1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
new file mode 100644
index 0000000..dc4bfc3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator.key;
+
+import java.io.Serializable;
+
+/**
+ * Dictionary key to generate dictionary
+ */
+public class DictionaryKey implements Serializable {
+
+  /**
+   * tableUniqueName
+   */
+  private String tableUniqueName;
+
+  /**
+   * columnName
+   */
+  private String columnName;
+
+  /**
+   * message data
+   */
+  private Object data;
+
+  /**
+   * message type
+   */
+  private String type;
+
+  /**
+   * dictionary client thread no
+   */
+  private String threadNo;
+
+  public String getTableUniqueName() {
+    return tableUniqueName;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  public Object getData() {
+    return data;
+  }
+
+  public void setData(Object data) {
+    this.data = data;
+  }
+
+  public void setThreadNo(String threadNo) {
+    this.threadNo = threadNo;
+  }
+
+  public String getThreadNo() {
+    return this.threadNo;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public void setTableUniqueName(String tableUniqueName) {
+    this.tableUniqueName = tableUniqueName;
+  }
+
+  public void setColumnName(String columnName) {
+    this.columnName = columnName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
new file mode 100644
index 0000000..d66b79a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator.key;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoRegister {
+  /**
+   * deserialize byte to DictionaryKey
+   *
+   * @param bb
+   * @return
+   */
+  public static DictionaryKey deserialize(byte[] bb) {
+    Kryo kryo = new Kryo();
+    kryo.setReferences(false);
+    kryo.setRegistrationRequired(true);
+    // register
+    Registration registration = kryo.register(DictionaryKey.class);
+
+    // deserialize
+    Input input = null;
+    input = new Input(bb);
+    DictionaryKey key = (DictionaryKey) kryo.readObject(input, registration.getType());
+    input.close();
+    return key;
+  }
+
+  /**
+   * serialize DictionaryKey to byte
+   *
+   * @param key
+   * @return
+   */
+  public static byte[] serialize(DictionaryKey key) {
+    Kryo kryo = new Kryo();
+    kryo.setReferences(false);
+    kryo.setRegistrationRequired(true);
+    // register
+    Registration registration = kryo.register(DictionaryKey.class);
+    //serialize
+    Output output = null;
+    output = new Output(1, 4096);
+    kryo.writeObject(output, key);
+    byte[] bb = output.toBytes();
+    output.flush();
+    return bb;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
new file mode 100644
index 0000000..bb0e8f8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.server;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.serialization.ClassResolvers;
+import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
+import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+
+
+/**
+ * Dictionary Server to generate dictionary keys.
+ */
+public class DictionaryServer {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(DictionaryServer.class.getName());
+
+  private ServerBootstrap bootstrap;
+
+  private DictionaryServerHandler dictionaryServerHandler;
+
+  /**
+   * start dictionary server
+   *
+   * @param port
+   * @throws Exception
+   */
+  public void startServer(int port) throws Exception {
+    bootstrap = new ServerBootstrap();
+    dictionaryServerHandler = new DictionaryServerHandler();
+
+    ExecutorService boss = Executors.newCachedThreadPool();
+    ExecutorService worker = Executors.newCachedThreadPool();
+
+    bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
+
+    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
+      public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline pipeline = Channels.pipeline();
+        pipeline.addLast("ObjectDecoder", new ObjectDecoder(ClassResolvers.cacheDisabled(
+            getClass().getClassLoader())));
+        pipeline.addLast("ObjectEncoder", new ObjectEncoder());
+        pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
+        return pipeline;
+      }
+    });
+    bootstrap.bind(new InetSocketAddress(port));
+    LOGGER.audit("Server Start!");
+  }
+
+  /**
+   * shutdown dictionary server
+   *
+   * @throws Exception
+   */
+  public void shutdown() throws Exception {
+    DictionaryKey key = new DictionaryKey();
+    key.setType("WRITE_DICTIONARY");
+    dictionaryServerHandler.processMessage(key);
+    bootstrap.releaseExternalResources();
+    bootstrap.shutdown();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
new file mode 100644
index 0000000..9160ffe
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.dictionary.server;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.dictionary.generator.ServerDictionaryGenerator;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
+
+import org.jboss.netty.channel.*;
+
+/**
+ * Handler for Dictionary server.
+ */
+public class DictionaryServerHandler extends SimpleChannelHandler {
+
+  private static final LogService LOGGER =
+          LogServiceFactory.getLogService(DictionaryServerHandler.class.getName());
+
+  /**
+   * dictionary generator
+   */
+  private ServerDictionaryGenerator generatorForServer = new ServerDictionaryGenerator();
+
+  /**
+   * channel connected
+   *
+   * @param ctx
+   * @param e
+   * @throws Exception
+   */
+  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+    LOGGER.audit("Connected " + ctx.getHandler());
+  }
+
+  /**
+   * receive message and handle
+   *
+   * @param ctx
+   * @param e
+   * @throws Exception
+   */
+  @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+      throws Exception {
+    byte[] request = (byte[]) e.getMessage();
+    DictionaryKey key = KryoRegister.deserialize(request);
+    int outPut = processMessage(key);
+    key.setData(outPut);
+    // Send back the response
+    byte[] response = KryoRegister.serialize(key);
+    ctx.getChannel().write(response);
+    super.messageReceived(ctx, e);
+  }
+
+  /**
+   * handle exceptions
+   *
+   * @param ctx
+   * @param e
+   */
+  @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+    LOGGER.error("exceptionCaught");
+    e.getCause().printStackTrace();
+    ctx.getChannel().close();
+  }
+
+  /**
+   * process message by message type
+   *
+   * @param key
+   * @return
+   * @throws Exception
+   */
+  public Integer processMessage(DictionaryKey key) throws Exception {
+    switch (key.getType()) {
+      case "DICTIONARY_GENERATION":
+        return generatorForServer.generateKey(key);
+      case "TABLE_INTIALIZATION":
+        generatorForServer.initializeGeneratorForTable(key);
+        return 0;
+      case "SIZE":
+        return generatorForServer.size(key);
+      case "WRITE_DICTIONARY":
+        generatorForServer.writeDictionaryData();
+        return 0;
+      default:
+        return -1;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index fbbed76..78cfc36 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -42,9 +42,11 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import org.apache.carbondata.common.factory.CarbonCommonFactory;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
@@ -67,6 +69,7 @@ import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
+import org.apache.carbondata.core.service.PathService;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.scan.model.QueryDimension;
 
@@ -1355,5 +1358,29 @@ public final class CarbonUtil {
     }
     return outputArray;
   }
+
+
+  /**
+   * This method will check if dictionary and its metadata file exists for a given column
+   *
+   * @param dictionaryColumnUniqueIdentifier unique identifier which contains dbName,
+   *                                         tableName and columnIdentifier
+   * @return
+   */
+  public static boolean isFileExistsForGivenColumn(String carbonStorePath,
+          DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier) {
+    PathService pathService = CarbonCommonFactory.getPathService();
+    CarbonTablePath carbonTablePath = pathService.getCarbonTablePath(carbonStorePath,
+            dictionaryColumnUniqueIdentifier.getCarbonTableIdentifier());
+
+    String dictionaryFilePath =
+            carbonTablePath.getDictionaryFilePath(dictionaryColumnUniqueIdentifier
+                    .getColumnIdentifier().getColumnId());
+    String dictionaryMetadataFilePath =
+            carbonTablePath.getDictionaryMetaFilePath(dictionaryColumnUniqueIdentifier
+                    .getColumnIdentifier().getColumnId());
+    // check if both dictionary and its metadata file exists for a given column
+    return isFileExists(dictionaryFilePath) && isFileExists(dictionaryMetadataFilePath);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 3274e58..5d6badd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -518,13 +518,13 @@ object GlobalDictionaryUtil {
    * @param hdfsLocation    store location on hdfs
    * @param dictFolderPath  generated global dict file path
    */
-  private def generatePredefinedColDictionary(colDictFilePath: String,
+  def generatePredefinedColDictionary(colDictFilePath: String,
       table: CarbonTableIdentifier,
       dimensions: Array[CarbonDimension],
       carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,
       hdfsLocation: String,
-      dictFolderPath: String) = {
+      dictFolderPath: String): Unit = {
     // set pre defined dictionary column
     setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
     val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 461633d..ee96c35 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -720,7 +720,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT"
+      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT",
+      "SINGLE_PASS"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index d975502..1bfd0c7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBl
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
@@ -341,6 +342,7 @@ object CarbonDataRDDFactory {
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       useKettle: Boolean,
+      result: Future[DictionaryServer],
       dataFrame: Option[DataFrame] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val isAgg = false
@@ -759,6 +761,19 @@ object CarbonDataRDDFactory {
           // TODO : Handle it
           LOGGER.info("********Database updated**********")
         }
+
+        // write dictionary file and shutdown dictionary server
+        if (carbonLoadModel.getUseOnePass) {
+          try {
+            result.get().shutdown()
+          } catch {
+            case ex: Exception =>
+              LOGGER.error("Error while close dictionary server and write dictionary file for " +
+                s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+              throw new Exception("Dataload failed due to error while write dictionary file!")
+          }
+        }
+
         LOGGER.audit("Data load is successful for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d13346a..30978e4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,10 +18,15 @@
 package org.apache.spark.sql.execution.command
 
 import java.io.File
+import java.util.concurrent.Callable
+import java.util.concurrent.Executors
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Future
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
@@ -41,6 +46,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDime
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
@@ -371,6 +377,24 @@ case class LoadTable(
       carbonLoadModel
         .setBadRecordsAction(
           TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
+      // when single_pass=true, and use_kettle=false, and not use all dict
+      val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
+        case "true" =>
+          if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {
+            true
+          } else {
+            LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
+              "can not be used together, and USE_KETTLE must be set as false")
+            false
+          }
+        case "false" =>
+          false
+        case illegal =>
+          LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
+            "Please set it as 'true' or 'false'")
+          false
+      }
+      carbonLoadModel.setUseOnePass(useOnePass)
 
       if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
           complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
@@ -387,6 +411,10 @@ case class LoadTable(
       carbonLoadModel.setAllDictPath(allDictionaryPath)
 
       val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+
+      var result: Future[DictionaryServer] = null
+      var executorService: ExecutorService = null
+
       try {
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -396,9 +424,49 @@ case class LoadTable(
         carbonLoadModel.setColDictFilePath(columnDict)
         carbonLoadModel.setDirectLoad(true)
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
-        GlobalDictionaryUtil
-          .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
-            dataFrame)
+
+        if (carbonLoadModel.getUseOnePass) {
+          val colDictFilePath = carbonLoadModel.getColDictFilePath
+          if (colDictFilePath != null) {
+            val storePath = relation.tableMeta.storePath
+            val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+            val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+              .getCarbonTableIdentifier
+            val carbonTablePath = CarbonStorePath
+              .getCarbonTablePath(storePath, carbonTableIdentifier)
+            val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+            val dimensions = carbonTable.getDimensionByTableName(
+              carbonTable.getFactTableName).asScala.toArray
+            carbonLoadModel.initPredefDictMap()
+            // generate predefined dictionary
+            GlobalDictionaryUtil
+              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
+                dimensions, carbonLoadModel, sqlContext, storePath, dictFolderPath)
+          }
+          // dictionaryServerClient dictionary generator
+          val dictionaryServerPort = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+          carbonLoadModel.setDictionaryServerPort(Integer.parseInt(dictionaryServerPort))
+          val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
+          carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+          // start dictionary server when use one pass load.
+          executorService = Executors.newFixedThreadPool(1)
+          result = executorService.submit(new Callable[DictionaryServer]() {
+            @throws[Exception]
+            def call: DictionaryServer = {
+              Thread.currentThread().setName("Dictionary server")
+              val server: DictionaryServer = new DictionaryServer
+              server.startServer(dictionaryServerPort.toInt)
+              server
+            }
+          })
+        } else {
+          GlobalDictionaryUtil
+            .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
+              dataFrame)
+        }
+
         CarbonDataRDDFactory.loadCarbonData(sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
@@ -406,6 +474,7 @@ case class LoadTable(
             columinar,
             partitionStatus,
             useKettle,
+            result,
             dataFrame)
       } catch {
         case ex: Exception =>
@@ -415,6 +484,10 @@ case class LoadTable(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
+          // shutdown dictionary server thread
+          if (carbonLoadModel.getUseOnePass) {
+            executorService.shutdownNow()
+          }
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/test/resources/columndictionary/country.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/columndictionary/country.csv b/integration/spark/src/test/resources/columndictionary/country.csv
new file mode 100644
index 0000000..1fd8032
--- /dev/null
+++ b/integration/spark/src/test/resources/columndictionary/country.csv
@@ -0,0 +1,5 @@
+usa
+china
+uk
+france
+brazil
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/test/resources/columndictionary/name.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/columndictionary/name.csv b/integration/spark/src/test/resources/columndictionary/name.csv
new file mode 100644
index 0000000..cbd6eef
--- /dev/null
+++ b/integration/spark/src/test/resources/columndictionary/name.csv
@@ -0,0 +1,10 @@
+aaa1
+aaa2
+aaa3
+aaa4
+aaa5
+aaa6
+aaa7
+aaa8
+aaa9
+aaa10
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/test/resources/dataIncrement.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/dataIncrement.csv b/integration/spark/src/test/resources/dataIncrement.csv
new file mode 100644
index 0000000..04a84cc
--- /dev/null
+++ b/integration/spark/src/test/resources/dataIncrement.csv
@@ -0,0 +1,21 @@
+ID,date,country,name,phonetype,serialname,salary
+1,2015/7/23,china,aaa1,phone197,ASD69643,15000
+2,2015/7/24,china,aaa2,phone756,ASD42892,15001
+3,2015/7/25,china,aaa3,phone1904,ASD37014,15002
+4,2015/7/26,china,aaa4,phone2435,ASD66902,15003
+5,2015/7/27,china,aaa5,phone2441,ASD90633,15004
+6,2015/7/28,china,aaa6,phone294,ASD59961,15005
+7,2015/7/29,china,aaa7,phone610,ASD14875,15006
+8,2015/7/30,china,aaa8,phone1848,ASD57308,15007
+9,2015/7/18,china,aaa9,phone706,ASD86717,15008
+10,2015/7/19,usa,aaa10,phone685,ASD30505,15009
+1001,2015/7/8,china,aaa1001,phone13181,ASD850271,16001
+1002,2015/9/8,china,aaa1002,phone13182,ASD850272,16002
+1003,2015/8/8,usa,aaa1003,phone13183,ASD850273,16003
+1004,2015/8/8,china,aaa1004,phone13184,ASD850274,16004
+1005,2015/7/8,india,aaa1005,phone13185,ASD850275,16005
+1006,2015/7/8,china,aaa1006,phone13186,ASD850276,16006
+1007,2015/7/8,england,aaa1007,phone13187,ASD850277,16007
+1008,2015/7/8,china,aaa1008,phone13188,ASD850278,16008
+1009,2015/7/8,japan,aaa1009,phone13189,ASD850279,16009
+1010,2015/7/8,china,aaa1010,phone131810,ASD8502710,16010

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
index cb10baf..b9c8a68 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCreateTableWithDouble.scala
@@ -44,6 +44,8 @@ class TestCreateTableWithDouble extends QueryTest with BeforeAndAfterAll {
       .getCanonicalPath + "/src/test/resources/sampleComplex.csv"
     countNum = Array(Row(0))
     doubleField = Array(Row(0))
+    sql("drop table if exists doubleComplex")
+    sql("drop table if exists doubleComplex2")
   }
 
   test("test creating carbon table with double in complex type") {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
new file mode 100644
index 0000000..3e0b4c8
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+  * Test Class for data loading use one pass
+  *
+  */
+class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS table_two_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass_2")
+
+    sql(
+      """
+        |CREATE TABLE table_two_pass (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      """
+        |LOAD DATA local inpath './src/test/resources/dataDiff.csv' INTO TABLE table_two_pass
+        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='false')
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE table_one_pass (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      """
+        |LOAD DATA local inpath './src/test/resources/dataDiff.csv' INTO TABLE table_one_pass
+        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true')
+      """.stripMargin)
+  }
+
+  test("test data loading use one pass") {
+    checkAnswer(
+      sql("select * from table_one_pass"),
+      sql("select * from table_two_pass")
+    )
+  }
+
+  test("test data loading use one pass when offer column dictionary file") {
+    sql(
+      """
+        |CREATE TABLE table_one_pass_2 (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      """
+        |LOAD DATA local inpath './src/test/resources/dataDiff.csv' INTO TABLE table_one_pass_2
+        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true', 'COLUMNDICT'=
+        |'country:./src/test/resources/columndictionary/country.csv, name:./src/test/resources/columndictionary/name.csv')
+      """.stripMargin)
+
+    checkAnswer(
+      sql("select * from table_one_pass_2"),
+      sql("select * from table_two_pass")
+    )
+  }
+
+  test("test data loading use one pass when do incremental load") {
+    sql(
+      """
+        |LOAD DATA local inpath './src/test/resources/dataIncrement.csv' INTO TABLE table_two_pass
+        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='false')
+      """.stripMargin)
+    sql(
+      """
+        |LOAD DATA local inpath './src/test/resources/dataIncrement.csv' INTO TABLE table_one_pass
+        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true')
+      """.stripMargin)
+
+    checkAnswer(
+      sql("select * from table_one_pass"),
+      sql("select * from table_two_pass")
+    )
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS table_two_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass_2")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
index 9b3ac45..7d45e88 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala
@@ -178,7 +178,7 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll {
   
     test("Greater Than equal to Filter with limit") {
     checkAnswer(
-      sql("select id from filtertestTables " + "where id >=999 limit 1"),
+      sql("select id from filtertestTables " + "where id >=999 order by id desc limit 1"),
       Seq(Row(1000))
     )
   }



[3/3] incubator-carbondata git commit: [CARBONDATA-401] One Pass Load This closes #310

Posted by ja...@apache.org.
[CARBONDATA-401] One Pass Load This closes #310


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/20a0b9ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/20a0b9ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/20a0b9ec

Branch: refs/heads/master
Commit: 20a0b9ec55749aff6b63ab9d1d1721af0806e483
Parents: 241f45f 05b2654
Author: jackylk <ja...@huawei.com>
Authored: Thu Dec 29 22:35:01 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Thu Dec 29 22:35:01 2016 +0800

----------------------------------------------------------------------
 .../AbstractColumnDictionaryInfo.java           |   2 +-
 .../dictionary/AbstractDictionaryCache.java     |  25 --
 .../cache/dictionary/ColumnDictionaryInfo.java  |   2 +-
 .../dictionary/ColumnReverseDictionaryInfo.java |   2 +-
 .../cache/dictionary/ForwardDictionary.java     |   4 +-
 .../cache/dictionary/ReverseDictionary.java     |   4 +-
 .../metadata/schema/table/CarbonTable.java      |  62 +++--
 .../core/constants/CarbonCommonConstants.java   |  10 +
 .../dictionary/client/DictionaryClient.java     |  92 +++++++
 .../client/DictionaryClientHandler.java         | 109 +++++++++
 .../dictionary/generator/DictionaryWriter.java  |  29 +++
 .../IncrementalColumnDictionaryGenerator.java   | 241 +++++++++++++++++++
 .../generator/ServerDictionaryGenerator.java    |  74 ++++++
 .../generator/TableDictionaryGenerator.java     | 116 +++++++++
 .../dictionary/generator/key/DictionaryKey.java |  92 +++++++
 .../dictionary/generator/key/KryoRegister.java  |  68 ++++++
 .../dictionary/server/DictionaryServer.java     |  93 +++++++
 .../server/DictionaryServerHandler.java         | 108 +++++++++
 .../apache/carbondata/core/util/CarbonUtil.java |  27 +++
 .../spark/util/GlobalDictionaryUtil.scala       |   4 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  15 ++
 .../execution/command/carbonTableSchema.scala   |  79 +++++-
 .../test/resources/columndictionary/country.csv |   5 +
 .../test/resources/columndictionary/name.csv    |  10 +
 .../spark/src/test/resources/dataIncrement.csv  |  21 ++
 .../complexType/TestCreateTableWithDouble.scala |   2 +
 .../dataload/TestLoadDataWithSinglePass.scala   | 114 +++++++++
 .../filterexpr/FilterProcessorTestCase.scala    |   2 +-
 .../processing/datatypes/PrimitiveDataType.java |  41 +++-
 .../processing/model/CarbonLoadModel.java       |  45 ++++
 .../newflow/CarbonDataLoadConfiguration.java    |  42 ++++
 .../newflow/DataLoadProcessBuilder.java         |  11 +
 .../impl/DictionaryFieldConverterImpl.java      |  52 +++-
 .../converter/impl/FieldEncoderFactory.java     |  23 +-
 .../converter/impl/RowConverterImpl.java        |  45 +++-
 .../DictionaryServerClientDictionary.java       |  95 ++++++++
 37 files changed, 1692 insertions(+), 77 deletions(-)
----------------------------------------------------------------------