You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/24 05:42:18 UTC

[1/2] incubator-carbondata git commit: Optimized and upgraded Dictionary Server

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 3e36cdf54 -> bb0bc2ee2


Optimized and upgraded Dictionary Server

Added multi dictionary clients for each thread

removed comment

Fixed comments

Fixed comments

Fixed style


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/87dade7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/87dade7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/87dade7a

Branch: refs/heads/master
Commit: 87dade7aaf25c5334620f7a4a56d62b3361e4755
Parents: 3e36cdf
Author: ravipesala <ra...@gmail.com>
Authored: Tue Feb 21 06:45:09 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Feb 24 13:40:40 2017 +0800

----------------------------------------------------------------------
 core/pom.xml                                    |   5 +
 .../dictionary/client/DictionaryClient.java     |  58 ++++---
 .../client/DictionaryClientHandler.java         | 112 +++++++------
 .../generator/ServerDictionaryGenerator.java    |  14 +-
 .../generator/TableDictionaryGenerator.java     |  12 +-
 .../dictionary/generator/key/DictionaryKey.java |  90 -----------
 .../generator/key/DictionaryMessage.java        | 157 +++++++++++++++++++
 .../generator/key/DictionaryMessageType.java    |  38 +++++
 .../dictionary/generator/key/KryoRegister.java  |  66 --------
 .../dictionary/server/DictionaryServer.java     |  86 +++++-----
 .../server/DictionaryServerHandler.java         |  77 +++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        |  32 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala        |  38 +++--
 .../processing/datatypes/PrimitiveDataType.java |  32 ++--
 .../impl/DictionaryFieldConverterImpl.java      |  35 +++--
 .../converter/impl/FieldEncoderFactory.java     |  24 +--
 .../converter/impl/RowConverterImpl.java        |  98 +++++++++---
 .../DictionaryServerClientDictionary.java       |  25 ++-
 .../steps/DataConverterProcessorStepImpl.java   |  22 ++-
 ...ConverterProcessorWithBucketingStepImpl.java |  21 ++-
 .../newflow/steps/SortProcessorStepImpl.java    |   4 +-
 .../sortandgroupby/sortdata/SortParameters.java |   4 +-
 22 files changed, 603 insertions(+), 447 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 1202ab1..5e46af3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -65,6 +65,11 @@
       <version>2.3.1</version>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>4.1.8.Final</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
index b3f7ccd..d86be99 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
@@ -17,21 +17,17 @@
 package org.apache.carbondata.core.dictionary.client;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.serialization.ClassResolvers;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
 
 
 /**
@@ -44,7 +40,7 @@ public class DictionaryClient {
 
   private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();
 
-  private ClientBootstrap clientBootstrap;
+  private NioEventLoopGroup workerGroup;
 
   /**
    * start dictionary client
@@ -53,23 +49,19 @@ public class DictionaryClient {
    * @param port
    */
   public void startClient(String address, int port) {
-    clientBootstrap = new ClientBootstrap();
-    ExecutorService boss = Executors.newCachedThreadPool();
-    ExecutorService worker = Executors.newCachedThreadPool();
-    clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
-    clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = Channels.pipeline();
-        pipeline.addLast("ObjectEncoder", new ObjectEncoder());
-        pipeline.addLast("ObjectDecoder", new ObjectDecoder(ClassResolvers.cacheDisabled(
-            getClass().getClassLoader())));
-        pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
-        return pipeline;
-      }
-    });
+    long start = System.currentTimeMillis();
+    workerGroup = new NioEventLoopGroup();
+    Bootstrap clientBootstrap = new Bootstrap();
+    clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
+        .handler(new ChannelInitializer<SocketChannel>() {
+          @Override public void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
+            pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
+          }
+        });
     clientBootstrap.connect(new InetSocketAddress(address, port));
-    LOGGER.audit("Client Start!");
+    LOGGER.info(
+        "Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
   }
 
   /**
@@ -78,7 +70,7 @@ public class DictionaryClient {
    * @param key
    * @return
    */
-  public DictionaryKey getDictionary(DictionaryKey key) {
+  public DictionaryMessage getDictionary(DictionaryMessage key) {
     return dictionaryClientHandler.getDictionary(key);
   }
 
@@ -86,7 +78,11 @@ public class DictionaryClient {
    * shutdown dictionary client
    */
   public void shutDown() {
-    clientBootstrap.releaseExternalResources();
-    clientBootstrap.shutdown();
+    workerGroup.shutdownGracefully();
+    try {
+      workerGroup.terminationFuture().sync();
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
index d99dc06..1ed8b36 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
@@ -16,95 +16,103 @@
  */
 package org.apache.carbondata.core.dictionary.client;
 
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
-import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 
 /**
  * Client handler to get data.
  */
-public class DictionaryClientHandler extends SimpleChannelHandler {
+public class DictionaryClientHandler extends ChannelInboundHandlerAdapter {
 
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
 
-  final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>();
+  private final BlockingQueue<DictionaryMessage> responseMsgQueue = new LinkedBlockingQueue<>();
 
   private ChannelHandlerContext ctx;
 
-  private Object lock = new Object();
+  private DictionaryChannelFutureListener channelFutureListener;
 
   @Override
-  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
     this.ctx = ctx;
-    LOGGER.audit("Connected " + ctx.getHandler());
-    super.channelConnected(ctx, e);
+    channelFutureListener = new DictionaryChannelFutureListener(ctx);
+    LOGGER.audit("Connected client " + ctx);
+    super.channelActive(ctx);
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-    byte[] response = (byte[]) e.getMessage();
-    DictionaryKey key = KryoRegister.deserialize(response);
-    BlockingQueue<DictionaryKey> dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
-    dictKeyQueue.offer(key);
-    super.messageReceived(ctx, e);
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    try {
+      ByteBuf data = (ByteBuf) msg;
+      DictionaryMessage key = new DictionaryMessage();
+      key.readData(data);
+      data.release();
+      responseMsgQueue.offer(key);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-    LOGGER.error("exceptionCaught");
-    e.getCause().printStackTrace();
-    ctx.getChannel().close();
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+    LOGGER.error(cause, "exceptionCaught");
+    ctx.close();
   }
 
   /**
    * client send request to server
    *
-   * @param key
-   * @return
+   * @param key DictionaryMessage
+   * @return DictionaryMessage
    */
-  public DictionaryKey getDictionary(DictionaryKey key) {
-    DictionaryKey dictionaryKey;
-    BlockingQueue<DictionaryKey> dictKeyQueue = null;
+  public DictionaryMessage getDictionary(DictionaryMessage key) {
+    DictionaryMessage dictionaryMessage;
     try {
-      synchronized (lock) {
-        dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
-        if (dictKeyQueue == null) {
-          dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
-          dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
-        }
-      }
-      byte[] serialize = KryoRegister.serialize(key);
-      ctx.getChannel().write(serialize);
+      ByteBuf buffer = ctx.alloc().buffer();
+      key.writeData(buffer);
+      ctx.writeAndFlush(buffer).addListener(channelFutureListener);
     } catch (Exception e) {
-      LOGGER.error("Error while send request to server " + e.getMessage());
-      ctx.getChannel().close();
+      LOGGER.error(e, "Error while send request to server ");
+      ctx.close();
     }
-    boolean interrupted = false;
     try {
-      for (; ; ) {
-        try {
-          dictionaryKey = dictKeyQueue.take();
-          return dictionaryKey;
-        } catch (InterruptedException ignore) {
-          interrupted = true;
-        }
+      dictionaryMessage = responseMsgQueue.poll(100, TimeUnit.SECONDS);
+      if (dictionaryMessage == null) {
+        throw new RuntimeException("Request timed out for key : " + key);
       }
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
+      return dictionaryMessage;
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static class DictionaryChannelFutureListener implements ChannelFutureListener {
+
+    private ChannelHandlerContext ctx;
+
+    DictionaryChannelFutureListener(ChannelHandlerContext ctx) {
+      this.ctx = ctx;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!future.isSuccess()) {
+        LOGGER.error(future.cause(), "Error while sending request to Dictionary Server");
+        ctx.close();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
index e0cf611..b2b9863 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -21,29 +21,31 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 
 /**
  * This is the dictionary generator for all tables. It generates dictionary
- * based on @{@link DictionaryKey}.
+ * based on @{@link DictionaryMessage}.
  */
-public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, DictionaryKey> {
+public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, DictionaryMessage> {
 
   /**
    * the map of tableName to TableDictionaryGenerator
    */
   private Map<String, TableDictionaryGenerator> tableMap = new ConcurrentHashMap<>();
 
-  @Override public Integer generateKey(DictionaryKey value) throws DictionaryGenerationException {
+  @Override
+  public Integer generateKey(DictionaryMessage value)
+      throws DictionaryGenerationException {
     TableDictionaryGenerator generator = tableMap.get(value.getTableUniqueName());
     assert generator != null : "Table initialization for generator is not done";
     return generator.generateKey(value);
   }
 
-  public void initializeGeneratorForTable(DictionaryKey key) {
+  public void initializeGeneratorForTable(DictionaryMessage key) {
     CarbonMetadata metadata = CarbonMetadata.getInstance();
     CarbonTable carbonTable = metadata.getCarbonTable(key.getTableUniqueName());
     CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
@@ -56,7 +58,7 @@ public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, D
     }
   }
 
-  public Integer size(DictionaryKey key) {
+  public Integer size(DictionaryMessage key) {
     TableDictionaryGenerator generator = tableMap.get(key.getTableUniqueName());
     assert generator != null : "Table intialization for generator is not done";
     return generator.size(key);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index 83ca399..add7811 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
  * Dictionary generation for table.
  */
 public class TableDictionaryGenerator
-    implements DictionaryGenerator<Integer, DictionaryKey>, DictionaryWriter {
+    implements DictionaryGenerator<Integer, DictionaryMessage>, DictionaryWriter {
 
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(TableDictionaryGenerator.class.getName());
@@ -57,7 +57,9 @@ public class TableDictionaryGenerator
             new IncrementalColumnDictionaryGenerator(dimension, 1));
   }
 
-  @Override public Integer generateKey(DictionaryKey value) throws DictionaryGenerationException {
+  @Override
+  public Integer generateKey(DictionaryMessage value)
+      throws DictionaryGenerationException {
     CarbonMetadata metadata = CarbonMetadata.getInstance();
     CarbonTable carbonTable = metadata.getCarbonTable(value.getTableUniqueName());
     CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
@@ -65,10 +67,10 @@ public class TableDictionaryGenerator
 
     DictionaryGenerator<Integer, String> generator =
             columnMap.get(dimension.getColumnId());
-    return generator.generateKey(value.getData().toString());
+    return generator.generateKey(value.getData());
   }
 
-  public Integer size(DictionaryKey key) {
+  public Integer size(DictionaryMessage key) {
     CarbonMetadata metadata = CarbonMetadata.getInstance();
     CarbonTable carbonTable = metadata.getCarbonTable(key.getTableUniqueName());
     CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
deleted file mode 100644
index e1d04d1..0000000
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.dictionary.generator.key;
-
-import java.io.Serializable;
-
-/**
- * Dictionary key to generate dictionary
- */
-public class DictionaryKey implements Serializable {
-
-  /**
-   * tableUniqueName
-   */
-  private String tableUniqueName;
-
-  /**
-   * columnName
-   */
-  private String columnName;
-
-  /**
-   * message data
-   */
-  private Object data;
-
-  /**
-   * message type
-   */
-  private String type;
-
-  /**
-   * dictionary client thread no
-   */
-  private String threadNo;
-
-  public String getTableUniqueName() {
-    return tableUniqueName;
-  }
-
-  public String getColumnName() {
-    return columnName;
-  }
-
-  public Object getData() {
-    return data;
-  }
-
-  public void setData(Object data) {
-    this.data = data;
-  }
-
-  public void setThreadNo(String threadNo) {
-    this.threadNo = threadNo;
-  }
-
-  public String getThreadNo() {
-    return this.threadNo;
-  }
-
-  public String getType() {
-    return type;
-  }
-
-  public void setType(String type) {
-    this.type = type;
-  }
-
-  public void setTableUniqueName(String tableUniqueName) {
-    this.tableUniqueName = tableUniqueName;
-  }
-
-  public void setColumnName(String columnName) {
-    this.columnName = columnName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
new file mode 100644
index 0000000..e7b14a2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator.key;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Dictionary key to generate dictionary
+ */
+public class DictionaryMessage {
+
+  /**
+   * tableUniqueName
+   */
+  private String tableUniqueName;
+
+  /**
+   * columnName
+   */
+  private String columnName;
+
+  /**
+   * message data
+   */
+  private String data;
+
+  /**
+   * Dictionary Value
+   */
+  private int dictionaryValue = CarbonCommonConstants.INVALID_SURROGATE_KEY;
+
+  /**
+   * message type
+   */
+  private DictionaryMessageType type;
+
+  public void readData(ByteBuf byteBuf) {
+    byte[] tableBytes = new byte[byteBuf.readInt()];
+    byteBuf.readBytes(tableBytes);
+    tableUniqueName = new String(tableBytes);
+
+    byte[] colBytes = new byte[byteBuf.readInt()];
+    byteBuf.readBytes(colBytes);
+    columnName = new String(colBytes);
+
+    byte typeByte = byteBuf.readByte();
+    type = getKeyType(typeByte);
+
+    byte dataType = byteBuf.readByte();
+    if (dataType == 0) {
+      dictionaryValue = byteBuf.readInt();
+    } else {
+      byte[] dataBytes = new byte[byteBuf.readInt()];
+      byteBuf.readBytes(dataBytes);
+      data = new String(dataBytes);
+    }
+  }
+
+  public void writeData(ByteBuf byteBuf) {
+    byte[] tableBytes = tableUniqueName.getBytes();
+    byteBuf.writeInt(tableBytes.length);
+    byteBuf.writeBytes(tableBytes);
+
+    byte[] colBytes = columnName.getBytes();
+    byteBuf.writeInt(colBytes.length);
+    byteBuf.writeBytes(colBytes);
+
+    byteBuf.writeByte(type.getType());
+
+    if (dictionaryValue > 0) {
+      byteBuf.writeByte(0);
+      byteBuf.writeInt(dictionaryValue);
+    } else {
+      byteBuf.writeByte(1);
+      byte[] dataBytes = data.getBytes();
+      byteBuf.writeInt(dataBytes.length);
+      byteBuf.writeBytes(dataBytes);
+    }
+  }
+
+  private DictionaryMessageType getKeyType(byte type) {
+    switch (type) {
+      case 1 :
+        return DictionaryMessageType.DICT_GENERATION;
+      case 2 :
+        return DictionaryMessageType.TABLE_INTIALIZATION;
+      case 3 :
+        return DictionaryMessageType.SIZE;
+      case 4 :
+        return DictionaryMessageType.WRITE_DICTIONARY;
+      default:
+        return DictionaryMessageType.DICT_GENERATION;
+    }
+  }
+
+  public String getTableUniqueName() {
+    return tableUniqueName;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  public String getData() {
+    return data;
+  }
+
+  public void setData(String data) {
+    this.data = data;
+  }
+
+  public DictionaryMessageType getType() {
+    return type;
+  }
+
+  public void setType(DictionaryMessageType type) {
+    this.type = type;
+  }
+
+  public void setTableUniqueName(String tableUniqueName) {
+    this.tableUniqueName = tableUniqueName;
+  }
+
+  public void setColumnName(String columnName) {
+    this.columnName = columnName;
+  }
+
+  public int getDictionaryValue() {
+    return dictionaryValue;
+  }
+
+  public void setDictionaryValue(int dictionaryValue) {
+    this.dictionaryValue = dictionaryValue;
+  }
+
+  @Override public String toString() {
+    return "DictionaryKey{ columnName='"
+        + columnName + '\'' + ", data='" + data + '\'' + ", dictionaryValue=" + dictionaryValue
+        + ", type=" + type + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
new file mode 100644
index 0000000..608b602
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator.key;
+
+/**
+ * Dictionary key types.
+ */
+public enum DictionaryMessageType {
+
+  DICT_GENERATION((byte) 1),
+  TABLE_INTIALIZATION((byte) 2),
+  SIZE((byte) 3),
+  WRITE_DICTIONARY((byte) 4);
+
+  final byte type;
+
+  DictionaryMessageType(byte type) {
+    this.type = type;
+  }
+
+  public byte getType() {
+    return type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
deleted file mode 100644
index 6beafd5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.dictionary.generator.key;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Registration;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class KryoRegister {
-  /**
-   * deserialize byte to DictionaryKey
-   *
-   * @param bb
-   * @return
-   */
-  public static DictionaryKey deserialize(byte[] bb) {
-    Kryo kryo = new Kryo();
-    kryo.setReferences(false);
-    kryo.setRegistrationRequired(true);
-    // register
-    Registration registration = kryo.register(DictionaryKey.class);
-
-    // deserialize
-    Input input = null;
-    input = new Input(bb);
-    DictionaryKey key = (DictionaryKey) kryo.readObject(input, registration.getType());
-    input.close();
-    return key;
-  }
-
-  /**
-   * serialize DictionaryKey to byte
-   *
-   * @param key
-   * @return
-   */
-  public static byte[] serialize(DictionaryKey key) {
-    Kryo kryo = new Kryo();
-    kryo.setReferences(false);
-    kryo.setRegistrationRequired(true);
-    // register
-    Registration registration = kryo.register(DictionaryKey.class);
-    //serialize
-    Output output = null;
-    output = new Output(1, 4096);
-    kryo.writeObject(output, key);
-    byte[] bb = output.toBytes();
-    output.flush();
-    return bb;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index d667c93..38eda0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -16,22 +16,19 @@
  */
 package org.apache.carbondata.core.dictionary.server;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
 
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.serialization.ClassResolvers;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
 
 
 /**
@@ -42,10 +39,11 @@ public class DictionaryServer {
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(DictionaryServer.class.getName());
 
-  private ServerBootstrap bootstrap;
-
   private DictionaryServerHandler dictionaryServerHandler;
 
+  private EventLoopGroup boss;
+  private EventLoopGroup worker;
+
   /**
    * start dictionary server
    *
@@ -53,27 +51,31 @@ public class DictionaryServer {
    * @throws Exception
    */
   public void startServer(int port) {
-    bootstrap = new ServerBootstrap();
+    long start = System.currentTimeMillis();
     dictionaryServerHandler = new DictionaryServerHandler();
+    boss = new NioEventLoopGroup();
+    worker = new NioEventLoopGroup();
+    // Configure the server.
+    try {
+      ServerBootstrap bootstrap = new ServerBootstrap();
+      bootstrap.group(boss, worker);
+      bootstrap.channel(NioServerSocketChannel.class);
 
-    ExecutorService boss = Executors.newCachedThreadPool();
-    ExecutorService worker = Executors.newCachedThreadPool();
+      bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+        @Override public void initChannel(SocketChannel ch) throws Exception {
+          ChannelPipeline pipeline = ch.pipeline();
+          pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
+        }
+      });
+      bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+      bootstrap.bind(port).sync();
 
-    bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
-
-    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() throws Exception {
-        ChannelPipeline pipeline = Channels.pipeline();
-        pipeline.addLast("ObjectDecoder", new ObjectDecoder(ClassResolvers.cacheDisabled(
-            getClass().getClassLoader())));
-        pipeline.addLast("ObjectEncoder", new ObjectEncoder());
-        pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
-        return pipeline;
-      }
-    });
-    bootstrap.bind(new InetSocketAddress(port));
-    LOGGER.audit("Server Start!");
+      LOGGER.info("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
+          + " Listening on port " + port);
+    } catch (Exception e) {
+      LOGGER.error(e, "Dictionary Server Start Failed");
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -82,10 +84,20 @@ public class DictionaryServer {
    * @throws Exception
    */
   public void shutdown() throws Exception {
-    DictionaryKey key = new DictionaryKey();
-    key.setType("WRITE_DICTIONARY");
+    worker.shutdownGracefully();
+    boss.shutdownGracefully();
+    // Wait until all threads are terminated.
+    boss.terminationFuture().sync();
+    worker.terminationFuture().sync();
+  }
+
+  /**
+   * Write dictionary to the store.
+   * @throws Exception
+   */
+  public void writeDictionary() throws Exception {
+    DictionaryMessage key = new DictionaryMessage();
+    key.setType(DictionaryMessageType.WRITE_DICTIONARY);
     dictionaryServerHandler.processMessage(key);
-    bootstrap.releaseExternalResources();
-    bootstrap.shutdown();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
index a70566c..a14b675 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
@@ -19,19 +19,18 @@ package org.apache.carbondata.core.dictionary.server;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.dictionary.generator.ServerDictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
-import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 
 /**
  * Handler for Dictionary server.
  */
-public class DictionaryServerHandler extends SimpleChannelHandler {
+@ChannelHandler.Sharable
+public class DictionaryServerHandler extends ChannelInboundHandlerAdapter {
 
   private static final LogService LOGGER =
           LogServiceFactory.getLogService(DictionaryServerHandler.class.getName());
@@ -42,45 +41,45 @@ public class DictionaryServerHandler extends SimpleChannelHandler {
   private ServerDictionaryGenerator generatorForServer = new ServerDictionaryGenerator();
 
   /**
-   * channel connected
+   * channel registered
    *
    * @param ctx
-   * @param e
    * @throws Exception
    */
-  public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
-    LOGGER.audit("Connected " + ctx.getHandler());
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    LOGGER.audit("Connected " + ctx);
+    super.channelActive(ctx);
   }
 
-  /**
-   * receive message and handle
-   *
-   * @param ctx
-   * @param e
-   * @throws Exception
-   */
-  @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-      throws Exception {
-    byte[] request = (byte[]) e.getMessage();
-    DictionaryKey key = KryoRegister.deserialize(request);
-    int outPut = processMessage(key);
-    key.setData(outPut);
-    // Send back the response
-    byte[] response = KryoRegister.serialize(key);
-    ctx.getChannel().write(response);
-    super.messageReceived(ctx, e);
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    try {
+      ByteBuf data = (ByteBuf) msg;
+      DictionaryMessage key = new DictionaryMessage();
+      key.readData(data);
+      data.release();
+      int outPut = processMessage(key);
+      key.setDictionaryValue(outPut);
+      // Send back the response
+      ByteBuf buffer = ctx.alloc().buffer();
+      key.writeData(buffer);
+      ctx.writeAndFlush(buffer);
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw e;
+    }
   }
 
   /**
    * handle exceptions
    *
    * @param ctx
-   * @param e
+   * @param cause
    */
-  @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    LOGGER.error("exceptionCaught");
-    e.getCause().printStackTrace();
-    ctx.getChannel().close();
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    LOGGER.error(cause, "exceptionCaught");
+    ctx.close();
   }
 
   /**
@@ -90,16 +89,16 @@ public class DictionaryServerHandler extends SimpleChannelHandler {
    * @return
    * @throws Exception
    */
-  public Integer processMessage(DictionaryKey key) throws Exception {
+  public int processMessage(DictionaryMessage key) throws Exception {
     switch (key.getType()) {
-      case "DICTIONARY_GENERATION":
+      case DICT_GENERATION :
         return generatorForServer.generateKey(key);
-      case "TABLE_INTIALIZATION":
+      case TABLE_INTIALIZATION :
         generatorForServer.initializeGeneratorForTable(key);
         return 0;
-      case "SIZE":
+      case SIZE :
         return generatorForServer.size(key);
-      case "WRITE_DICTIONARY":
+      case WRITE_DICTIONARY :
         generatorForServer.writeDictionaryData();
         return 0;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 36dcab4..df6386b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -934,6 +934,7 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
+        shutDownDictionaryServer(carbonLoadModel, result, false)
         throw new Exception(errorMessage)
       } else {
         val metadataDetails = status(0)._2
@@ -948,6 +949,7 @@ object CarbonDataRDDFactory {
                       .getTableName
                 }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
+            shutDownDictionaryServer(carbonLoadModel, result, false)
             throw new Exception(errorMessage)
           }
         } else if (!carbonLoadModel.isRetentionRequest) {
@@ -955,17 +957,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
 
-        // write dictionary file and shutdown dictionary server
-        if (carbonLoadModel.getUseOnePass) {
-          try {
-            result.get().shutdown()
-          } catch {
-            case ex: Exception =>
-              LOGGER.error("Error while close dictionary server and write dictionary file for " +
-                s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-              throw new Exception("Dataload failed due to error while write dictionary file!")
-          }
-        }
+        shutDownDictionaryServer(carbonLoadModel, result)
 
         LOGGER.audit("Data load is successful for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -982,4 +974,22 @@ object CarbonDataRDDFactory {
 
   }
 
+  private def shutDownDictionaryServer(carbonLoadModel: CarbonLoadModel,
+      result: Future[DictionaryServer], writeDictionary: Boolean = true): Unit = {
+    // write dictionary file and shutdown dictionary server
+    if (carbonLoadModel.getUseOnePass) {
+      try {
+        val server = result.get()
+        if (writeDictionary) {
+          server.writeDictionary()
+        }
+        server.shutdown()
+      } catch {
+        case ex: Exception =>
+          LOGGER.error("Error while close dictionary server and write dictionary file for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          throw new Exception("Dataload failed due to error while write dictionary file!")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c7f22cc..771e455 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -871,8 +871,10 @@ object CarbonDataRDDFactory {
                 errorMessage = errorMessage + ": " + executorMessage
               }
             case _ =>
-              executorMessage = ex.getCause.getMessage
-              errorMessage = errorMessage + ": " + executorMessage
+              if (ex.getCause != null) {
+                executorMessage = ex.getCause.getMessage
+                errorMessage = errorMessage + ": " + executorMessage
+              }
           }
           LOGGER.info(errorMessage)
           LOGGER.error(ex)
@@ -944,6 +946,7 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
+        shutdownDictionaryServer(carbonLoadModel, result, false)
         throw new Exception(errorMessage)
       } else {
         val metadataDetails = status(0)._2
@@ -958,6 +961,7 @@ object CarbonDataRDDFactory {
                       .getTableName
                 }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
+            shutdownDictionaryServer(carbonLoadModel, result, false)
             throw new Exception(errorMessage)
           }
         } else if (!carbonLoadModel.isRetentionRequest) {
@@ -965,17 +969,7 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
 
-        // write dictionary file and shutdown dictionary server
-        if (carbonLoadModel.getUseOnePass) {
-          try {
-            result.get().shutdown()
-          } catch {
-            case ex: Exception =>
-              LOGGER.error("Error while close dictionary server and write dictionary file for " +
-                s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-              throw new Exception("Dataload failed due to error while write dictionary file!")
-          }
-        }
+        shutdownDictionaryServer(carbonLoadModel, result)
         LOGGER.audit("Data load is successful for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
@@ -991,4 +985,22 @@ object CarbonDataRDDFactory {
 
   }
 
+  private def shutdownDictionaryServer(carbonLoadModel: CarbonLoadModel,
+      result: Future[DictionaryServer], writeDictionary: Boolean = true) = {
+    // write dictionary file and shutdown dictionary server
+    if (carbonLoadModel.getUseOnePass) {
+      try {
+        val server = result.get()
+        if (writeDictionary) {
+          server.writeDictionary()
+        }
+        server.shutdown()
+      } catch {
+        case ex: Exception =>
+          LOGGER.error("Error while close dictionary server and write dictionary file for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          throw new Exception("Dataload failed due to error while write dictionary file!")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 8c3b71e..105f5f4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -21,7 +21,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -32,7 +31,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -119,10 +119,9 @@ public class PrimitiveDataType implements GenericDataType<Object> {
    * @param columnId
    */
   public PrimitiveDataType(String name, String parentname, String columnId,
-                           CarbonDimension carbonDimension,
-                           Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-                           CarbonTableIdentifier carbonTableIdentifier,
-                           DictionaryClient client, Boolean useOnePass, String storePath) {
+      CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      String storePath, boolean tableInitialize, Map<Object, Integer> localCache) {
     this.name = name;
     this.parentname = parentname;
     this.columnId = columnId;
@@ -140,20 +139,19 @@ public class PrimitiveDataType implements GenericDataType<Object> {
           if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
             dictionary = cache.get(identifier);
           }
-          String threadNo = "initial";
-          DictionaryKey dictionaryKey = new DictionaryKey();
-          dictionaryKey.setColumnName(carbonDimension.getColName());
-          dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
-          dictionaryKey.setThreadNo(threadNo);
+          DictionaryMessage dictionaryMessage = new DictionaryMessage();
+          dictionaryMessage.setColumnName(carbonDimension.getColName());
+          dictionaryMessage.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
           // for table initialization
-          dictionaryKey.setType("TABLE_INTIALIZATION");
-          dictionaryKey.setData("0");
-          client.getDictionary(dictionaryKey);
-          Map<Object, Integer> localCache = new HashMap<>();
+          dictionaryMessage.setType(DictionaryMessageType.TABLE_INTIALIZATION);
+          dictionaryMessage.setData("0");
+          if (tableInitialize) {
+            client.getDictionary(dictionaryMessage);
+          }
           // for generate dictionary
-          dictionaryKey.setType("DICTIONARY_GENERATION");
+          dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
           dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
-                  dictionaryKey, localCache);
+              dictionaryMessage, localCache);
         } else {
           dictionary = cache.get(identifier);
           dictionaryGenerator = new PreCreatedDictionary(dictionary);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
index 5c037b5..ae5cadb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.processing.newflow.converter.impl;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -29,7 +28,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -51,11 +51,15 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
 
   private String nullFormat;
 
+  private Dictionary dictionary;
+
+  private DictionaryMessage dictionaryMessage;
+
   public DictionaryFieldConverterImpl(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
-      DictionaryClient client, Boolean useOnePass, String storePath)
-      throws IOException {
+      DictionaryClient client, boolean useOnePass, String storePath, boolean tableInitialize,
+      Map<Object, Integer> localCache) throws IOException {
     this.index = index;
     this.carbonDimension = (CarbonDimension) dataField.getColumn();
     this.nullFormat = nullFormat;
@@ -63,26 +67,24 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
         new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
             dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
 
-    Dictionary dictionary = null;
     // if use one pass, use DictionaryServerClientDictionary
     if (useOnePass) {
       if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
         dictionary = cache.get(identifier);
       }
-      String threadNo = "initial";
-      DictionaryKey dictionaryKey = new DictionaryKey();
-      dictionaryKey.setColumnName(dataField.getColumn().getColName());
-      dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
-      dictionaryKey.setThreadNo(threadNo);
+      dictionaryMessage = new DictionaryMessage();
+      dictionaryMessage.setColumnName(dataField.getColumn().getColName());
+      dictionaryMessage.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
       // for table initialization
-      dictionaryKey.setType("TABLE_INTIALIZATION");
-      dictionaryKey.setData("0");
-      client.getDictionary(dictionaryKey);
-      Map<Object, Integer> localCache = new HashMap<>();
+      dictionaryMessage.setType(DictionaryMessageType.TABLE_INTIALIZATION);
+      dictionaryMessage.setData("0");
+      if (tableInitialize) {
+        client.getDictionary(dictionaryMessage);
+      }
       // for generate dictionary
-      dictionaryKey.setType("DICTIONARY_GENERATION");
+      dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
       dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
-              dictionaryKey, localCache);
+          dictionaryMessage, localCache);
     } else {
       dictionary = cache.get(identifier);
       dictionaryGenerator = new PreCreatedDictionary(dictionary);
@@ -107,4 +109,5 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
   public void fillColumnCardinality(List<Integer> cardinality) {
     cardinality.add(dictionaryGenerator.size());
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index 903be7f..2a7ccec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.newflow.converter.impl;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
@@ -61,7 +62,8 @@ public class FieldEncoderFactory {
   public FieldConverter createFieldEncoder(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
-      DictionaryClient client, Boolean useOnePass, String storePath)
+      DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize,
+      Map<Object, Integer> localCache)
       throws IOException {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimesion()) {
@@ -71,11 +73,11 @@ public class FieldEncoderFactory {
       } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
         return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
-            index, client, useOnePass, storePath);
+            index, client, useOnePass, storePath, tableInitialize, localCache);
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(
             createComplexType(dataField, cache, carbonTableIdentifier,
-                    client, useOnePass, storePath), index);
+                    client, useOnePass, storePath, tableInitialize, localCache), index);
       } else {
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
       }
@@ -89,10 +91,10 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier,
-      DictionaryClient client, Boolean useOnePass, String storePath) {
+      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      String storePath, boolean tableInitialize, Map<Object, Integer> localCache) {
     return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
-        carbonTableIdentifier, client, useOnePass, storePath);
+        carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache);
   }
 
   /**
@@ -102,8 +104,8 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier,
-      DictionaryClient client, Boolean useOnePass, String storePath) {
+      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      String storePath, boolean tableInitialize, Map<Object, Integer> localCache) {
     switch (carbonColumn.getDataType()) {
       case ARRAY:
         List<CarbonDimension> listOfChildDimensions =
@@ -113,7 +115,7 @@ public class FieldEncoderFactory {
             new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
         for (CarbonDimension dimension : listOfChildDimensions) {
           arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier, client, useOnePass, storePath));
+              carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache));
         }
         return arrayDataType;
       case STRUCT:
@@ -124,7 +126,7 @@ public class FieldEncoderFactory {
             new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
         for (CarbonDimension dimension : dimensions) {
           structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier, client, useOnePass, storePath));
+              carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache));
         }
         return structDataType;
       case MAP:
@@ -132,7 +134,7 @@ public class FieldEncoderFactory {
       default:
         return new PrimitiveDataType(carbonColumn.getColName(), parentName,
             carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
-            carbonTableIdentifier, client, useOnePass, storePath);
+            carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 52cf4c9..cdc69eb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -19,12 +19,16 @@ package org.apache.carbondata.processing.newflow.converter.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
@@ -48,6 +52,9 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
  */
 public class RowConverterImpl implements RowConverter {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowConverterImpl.class.getName());
+
   private CarbonDataLoadConfiguration configuration;
 
   private DataField[] fields;
@@ -58,10 +65,14 @@ public class RowConverterImpl implements RowConverter {
 
   private BadRecordLogHolder logHolder;
 
-  private DictionaryClient dictClient;
+  private List<DictionaryClient> dictClients = new ArrayList<>();
 
   private ExecutorService executorService;
 
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+
+  private Map<Object, Integer>[] localCaches;
+
   public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
       BadRecordsLogger badRecordLogger) {
     this.fields = fields;
@@ -72,19 +83,38 @@ public class RowConverterImpl implements RowConverter {
   @Override
   public void initialize() throws IOException {
     CacheProvider cacheProvider = CacheProvider.getInstance();
-    Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache =
-        cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
-            configuration.getTableIdentifier().getStorePath());
+    cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
+        configuration.getTableIdentifier().getStorePath());
     String nullFormat =
         configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
             .toString();
     List<FieldConverter> fieldConverterList = new ArrayList<>();
-
+    localCaches = new Map[fields.length];
     long lruCacheStartTime = System.currentTimeMillis();
+    DictionaryClient client = createDictionaryClient();
+    dictClients.add(client);
 
+    for (int i = 0; i < fields.length; i++) {
+      localCaches[i] = new ConcurrentHashMap<>();
+      FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
+          .createFieldEncoder(fields[i], cache,
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
+              client, configuration.getUseOnePass(),
+              configuration.getTableIdentifier().getStorePath(), true, localCaches[i]);
+      fieldConverterList.add(fieldConverter);
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
+    fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+    logHolder = new BadRecordLogHolder();
+  }
+
+  private DictionaryClient createDictionaryClient() {
     // for one pass load, start the dictionary client
     if (configuration.getUseOnePass()) {
-      executorService = Executors.newFixedThreadPool(1);
+      if (executorService == null) {
+        executorService = Executors.newCachedThreadPool();
+      }
       Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
         @Override
         public DictionaryClient call() throws Exception {
@@ -100,27 +130,17 @@ public class RowConverterImpl implements RowConverter {
         // wait for client initialization finished, or will raise null pointer exception
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-        e.printStackTrace();
+        LOGGER.error(e);
+        throw new RuntimeException(e);
       }
 
       try {
-        dictClient = result.get();
+        return result.get();
       } catch (InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
     }
-    for (int i = 0; i < fields.length; i++) {
-      FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
-          .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
-              dictClient, configuration.getUseOnePass(),
-              configuration.getTableIdentifier().getStorePath());
-      fieldConverterList.add(fieldConverter);
-    }
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
-    fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
-    logHolder = new BadRecordLogHolder();
+    return null;
   }
 
   @Override
@@ -142,10 +162,12 @@ public class RowConverterImpl implements RowConverter {
   @Override
   public void finish() {
     List<Integer> dimCardinality = new ArrayList<>();
-    for (int i = 0; i < fieldConverters.length; i++) {
-      if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
-        ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
-            .fillColumnCardinality(dimCardinality);
+    if (fieldConverters != null) {
+      for (int i = 0; i < fieldConverters.length; i++) {
+        if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
+          ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+              .fillColumnCardinality(dimCardinality);
+        }
       }
     }
     int[] cardinality = new int[dimCardinality.size()];
@@ -157,7 +179,11 @@ public class RowConverterImpl implements RowConverter {
 
     // close dictionary client when finish write
     if (configuration.getUseOnePass()) {
-      dictClient.shutDown();
+      for (DictionaryClient client : dictClients) {
+        if (client != null) {
+          client.shutDown();
+        }
+      }
       executorService.shutdownNow();
     }
   }
@@ -166,7 +192,27 @@ public class RowConverterImpl implements RowConverter {
   public RowConverter createCopyForNewThread() {
     RowConverterImpl converter =
         new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
-    converter.fieldConverters = fieldConverters;
+    List<FieldConverter> fieldConverterList = new ArrayList<>();
+    DictionaryClient client = createDictionaryClient();
+    dictClients.add(client);
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    for (int i = 0; i < fields.length; i++) {
+      FieldConverter fieldConverter = null;
+      try {
+        fieldConverter = FieldEncoderFactory.getInstance()
+            .createFieldEncoder(fields[i], cache,
+                configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
+                client, configuration.getUseOnePass(),
+                configuration.getTableIdentifier().getStorePath(), false, localCaches[i]);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      fieldConverterList.add(fieldConverter);
+    }
+    converter.fieldConverters =
+        fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
     converter.logHolder = new BadRecordLogHolder();
     return converter;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
index 6cfc9a0..c7ff1c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
@@ -23,7 +23,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
 
 /**
  * Dictionary implementation along with dictionary server client to get new dictionary values
@@ -36,17 +37,15 @@ public class DictionaryServerClientDictionary implements BiDictionary<Integer, O
 
   private Map<Object, Integer> localCache;
 
-  private DictionaryKey dictionaryKey;
+  private DictionaryMessage dictionaryMessage;
 
   private int base;
 
-  private Object lock = new Object();
-
   public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
-      DictionaryKey key, Map<Object, Integer> localCache) {
+      DictionaryMessage key, Map<Object, Integer> localCache) {
     this.dictionary = dictionary;
     this.client = client;
-    this.dictionaryKey = key;
+    this.dictionaryMessage = key;
     this.localCache = localCache;
     this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
   }
@@ -54,11 +53,10 @@ public class DictionaryServerClientDictionary implements BiDictionary<Integer, O
   @Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
     Integer key = getKey(value);
     if (key == null) {
-      synchronized (lock) {
-        dictionaryKey.setData(value);
-        dictionaryKey.setThreadNo(Thread.currentThread().getId() + "");
-        DictionaryKey dictionaryValue = client.getDictionary(dictionaryKey);
-        key = (Integer) dictionaryValue.getData();
+      dictionaryMessage.setData(value.toString());
+      DictionaryMessage dictionaryValue = client.getDictionary(dictionaryMessage);
+      key = dictionaryValue.getDictionaryValue();
+      synchronized (localCache) {
         localCache.put(value, key);
       }
       return key + base;
@@ -85,9 +83,8 @@ public class DictionaryServerClientDictionary implements BiDictionary<Integer, O
   }
 
   @Override public int size() {
-    dictionaryKey.setType("SIZE");
-    int size = (int) client.getDictionary(dictionaryKey).getData()
-            + base;
+    dictionaryMessage.setType(DictionaryMessageType.SIZE);
+    int size = client.getDictionary(dictionaryMessage).getDictionaryValue() + base;
     return size;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 275f017..ebc659e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.processing.newflow.steps;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -42,7 +44,7 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
  */
 public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
-  private RowConverter converter;
+  private List<RowConverter> converters;
 
   public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
@@ -57,8 +59,11 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   @Override
   public void initialize() throws IOException {
     child.initialize();
+    converters = new ArrayList<>();
     BadRecordsLogger badRecordLogger = createBadRecordLogger();
-    converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    RowConverter converter =
+        new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    converters.add(converter);
     converter.initialize();
   }
 
@@ -71,8 +76,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   @Override
   protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
     return new CarbonIterator<CarbonRowBatch>() {
-      RowConverter localConverter = converter.createCopyForNewThread();
+      private boolean first = true;
+      private RowConverter localConverter;
       @Override public boolean hasNext() {
+        if (first) {
+          first = false;
+          localConverter = converters.get(0).createCopyForNewThread();
+        }
         return childIter.hasNext();
       }
 
@@ -154,8 +164,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   public void close() {
     if (!closed) {
       super.close();
-      if (converter != null) {
-        converter.finish();
+      if (converters != null) {
+        for (RowConverter converter : converters) {
+          converter.finish();
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index fef4aaa..af66ad7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -48,7 +48,7 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
  */
 public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
 
-  private RowConverter converter;
+  private List<RowConverter> converters;
 
   private Partitioner<Object[]> partitioner;
 
@@ -65,8 +65,11 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   @Override
   public void initialize() throws IOException {
     child.initialize();
+    converters = new ArrayList<>();
     BadRecordsLogger badRecordLogger = createBadRecordLogger();
-    converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    RowConverter converter =
+        new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    converters.add(converter);
     converter.initialize();
     List<Integer> indexes = new ArrayList<>();
     List<ColumnSchema> columnSchemas = new ArrayList<>();
@@ -95,8 +98,14 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   @Override
   protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
     return new CarbonIterator<CarbonRowBatch>() {
-      RowConverter localConverter = converter.createCopyForNewThread();
+      RowConverter localConverter;
+      private boolean first = true;
       @Override public boolean hasNext() {
+        if (first) {
+          first = false;
+          localConverter = converters.get(0).createCopyForNewThread();
+          converters.add(localConverter);
+        }
         return childIter.hasNext();
       }
 
@@ -181,8 +190,10 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   public void close() {
     if (!closed) {
       super.close();
-      if (converter != null) {
-        converter.finish();
+      if (converters != null) {
+        for (RowConverter converter : converters) {
+          converter.finish();
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index bd4b0e6..aca47b6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -87,7 +87,9 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
   public void close() {
     if (!closed) {
       super.close();
-      sorter.close();
+      if (sorter != null) {
+        sorter.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index dc40efe..f618965 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -386,7 +386,7 @@ public class SortParameters {
     } catch (NumberFormatException exc) {
       numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
     }
-    parameters.setNumberOfCores(numberOfCores);
+    parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
     parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
         .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
@@ -489,7 +489,7 @@ public class SortParameters {
     } catch (NumberFormatException exc) {
       numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
     }
-    parameters.setNumberOfCores(numberOfCores);
+    parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
 
     parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
         .getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,


[2/2] incubator-carbondata git commit: [CARBONDATA-715] Optimize Single-Pass data load flow This closes #605

Posted by ja...@apache.org.
[CARBONDATA-715] Optimize Single-Pass data load flow This closes #605


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bb0bc2ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bb0bc2ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bb0bc2ee

Branch: refs/heads/master
Commit: bb0bc2ee22f7a9b86c900171e6c9bcf60a1601b1
Parents: 3e36cdf 87dade7
Author: jackylk <ja...@huawei.com>
Authored: Fri Feb 24 13:42:01 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Feb 24 13:42:01 2017 +0800

----------------------------------------------------------------------
 core/pom.xml                                    |   5 +
 .../dictionary/client/DictionaryClient.java     |  58 ++++---
 .../client/DictionaryClientHandler.java         | 112 +++++++------
 .../generator/ServerDictionaryGenerator.java    |  14 +-
 .../generator/TableDictionaryGenerator.java     |  12 +-
 .../dictionary/generator/key/DictionaryKey.java |  90 -----------
 .../generator/key/DictionaryMessage.java        | 157 +++++++++++++++++++
 .../generator/key/DictionaryMessageType.java    |  38 +++++
 .../dictionary/generator/key/KryoRegister.java  |  66 --------
 .../dictionary/server/DictionaryServer.java     |  86 +++++-----
 .../server/DictionaryServerHandler.java         |  77 +++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        |  32 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala        |  38 +++--
 .../processing/datatypes/PrimitiveDataType.java |  32 ++--
 .../impl/DictionaryFieldConverterImpl.java      |  35 +++--
 .../converter/impl/FieldEncoderFactory.java     |  24 +--
 .../converter/impl/RowConverterImpl.java        |  98 +++++++++---
 .../DictionaryServerClientDictionary.java       |  25 ++-
 .../steps/DataConverterProcessorStepImpl.java   |  22 ++-
 ...ConverterProcessorWithBucketingStepImpl.java |  21 ++-
 .../newflow/steps/SortProcessorStepImpl.java    |   4 +-
 .../sortandgroupby/sortdata/SortParameters.java |   4 +-
 22 files changed, 603 insertions(+), 447 deletions(-)
----------------------------------------------------------------------