You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/24 05:42:18 UTC
[1/2] incubator-carbondata git commit: Optimized and upgraded
Dictionary Server
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 3e36cdf54 -> bb0bc2ee2
Optimized and upgraded Dictionary Server
Added multi dictionary clients for each thread
removed comment
Fixed comments
Fixed comments
Fixed style
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/87dade7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/87dade7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/87dade7a
Branch: refs/heads/master
Commit: 87dade7aaf25c5334620f7a4a56d62b3361e4755
Parents: 3e36cdf
Author: ravipesala <ra...@gmail.com>
Authored: Tue Feb 21 06:45:09 2017 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Fri Feb 24 13:40:40 2017 +0800
----------------------------------------------------------------------
core/pom.xml | 5 +
.../dictionary/client/DictionaryClient.java | 58 ++++---
.../client/DictionaryClientHandler.java | 112 +++++++------
.../generator/ServerDictionaryGenerator.java | 14 +-
.../generator/TableDictionaryGenerator.java | 12 +-
.../dictionary/generator/key/DictionaryKey.java | 90 -----------
.../generator/key/DictionaryMessage.java | 157 +++++++++++++++++++
.../generator/key/DictionaryMessageType.java | 38 +++++
.../dictionary/generator/key/KryoRegister.java | 66 --------
.../dictionary/server/DictionaryServer.java | 86 +++++-----
.../server/DictionaryServerHandler.java | 77 +++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 32 ++--
.../spark/rdd/CarbonDataRDDFactory.scala | 38 +++--
.../processing/datatypes/PrimitiveDataType.java | 32 ++--
.../impl/DictionaryFieldConverterImpl.java | 35 +++--
.../converter/impl/FieldEncoderFactory.java | 24 +--
.../converter/impl/RowConverterImpl.java | 98 +++++++++---
.../DictionaryServerClientDictionary.java | 25 ++-
.../steps/DataConverterProcessorStepImpl.java | 22 ++-
...ConverterProcessorWithBucketingStepImpl.java | 21 ++-
.../newflow/steps/SortProcessorStepImpl.java | 4 +-
.../sortandgroupby/sortdata/SortParameters.java | 4 +-
22 files changed, 603 insertions(+), 447 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 1202ab1..5e46af3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -65,6 +65,11 @@
<version>2.3.1</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.1.8.Final</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
index b3f7ccd..d86be99 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
@@ -17,21 +17,17 @@
package org.apache.carbondata.core.dictionary.client;
import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.serialization.ClassResolvers;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
/**
@@ -44,7 +40,7 @@ public class DictionaryClient {
private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();
- private ClientBootstrap clientBootstrap;
+ private NioEventLoopGroup workerGroup;
/**
* start dictionary client
@@ -53,23 +49,19 @@ public class DictionaryClient {
* @param port
*/
public void startClient(String address, int port) {
- clientBootstrap = new ClientBootstrap();
- ExecutorService boss = Executors.newCachedThreadPool();
- ExecutorService worker = Executors.newCachedThreadPool();
- clientBootstrap.setFactory(new NioClientSocketChannelFactory(boss, worker));
- clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("ObjectEncoder", new ObjectEncoder());
- pipeline.addLast("ObjectDecoder", new ObjectDecoder(ClassResolvers.cacheDisabled(
- getClass().getClassLoader())));
- pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
- return pipeline;
- }
- });
+ long start = System.currentTimeMillis();
+ workerGroup = new NioEventLoopGroup();
+ Bootstrap clientBootstrap = new Bootstrap();
+ clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
+ }
+ });
clientBootstrap.connect(new InetSocketAddress(address, port));
- LOGGER.audit("Client Start!");
+ LOGGER.info(
+ "Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
}
/**
@@ -78,7 +70,7 @@ public class DictionaryClient {
* @param key
* @return
*/
- public DictionaryKey getDictionary(DictionaryKey key) {
+ public DictionaryMessage getDictionary(DictionaryMessage key) {
return dictionaryClientHandler.getDictionary(key);
}
@@ -86,7 +78,11 @@ public class DictionaryClient {
* shutdown dictionary client
*/
public void shutDown() {
- clientBootstrap.releaseExternalResources();
- clientBootstrap.shutdown();
+ workerGroup.shutdownGracefully();
+ try {
+ workerGroup.terminationFuture().sync();
+ } catch (InterruptedException e) {
+ LOGGER.error(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
index d99dc06..1ed8b36 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClientHandler.java
@@ -16,95 +16,103 @@
*/
package org.apache.carbondata.core.dictionary.client;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
-import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Client handler to get data.
*/
-public class DictionaryClientHandler extends SimpleChannelHandler {
+public class DictionaryClientHandler extends ChannelInboundHandlerAdapter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
- final Map<String, BlockingQueue<DictionaryKey>> dictKeyQueueMap = new ConcurrentHashMap<>();
+ private final BlockingQueue<DictionaryMessage> responseMsgQueue = new LinkedBlockingQueue<>();
private ChannelHandlerContext ctx;
- private Object lock = new Object();
+ private DictionaryChannelFutureListener channelFutureListener;
@Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
- LOGGER.audit("Connected " + ctx.getHandler());
- super.channelConnected(ctx, e);
+ channelFutureListener = new DictionaryChannelFutureListener(ctx);
+ LOGGER.audit("Connected client " + ctx);
+ super.channelActive(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- byte[] response = (byte[]) e.getMessage();
- DictionaryKey key = KryoRegister.deserialize(response);
- BlockingQueue<DictionaryKey> dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
- dictKeyQueue.offer(key);
- super.messageReceived(ctx, e);
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ ByteBuf data = (ByteBuf) msg;
+ DictionaryMessage key = new DictionaryMessage();
+ key.readData(data);
+ data.release();
+ responseMsgQueue.offer(key);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw e;
+ }
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- LOGGER.error("exceptionCaught");
- e.getCause().printStackTrace();
- ctx.getChannel().close();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOGGER.error(cause, "exceptionCaught");
+ ctx.close();
}
/**
* client send request to server
*
- * @param key
- * @return
+ * @param key DictionaryMessage
+ * @return DictionaryMessage
*/
- public DictionaryKey getDictionary(DictionaryKey key) {
- DictionaryKey dictionaryKey;
- BlockingQueue<DictionaryKey> dictKeyQueue = null;
+ public DictionaryMessage getDictionary(DictionaryMessage key) {
+ DictionaryMessage dictionaryMessage;
try {
- synchronized (lock) {
- dictKeyQueue = dictKeyQueueMap.get(key.getThreadNo());
- if (dictKeyQueue == null) {
- dictKeyQueue = new LinkedBlockingQueue<DictionaryKey>();
- dictKeyQueueMap.put(key.getThreadNo(), dictKeyQueue);
- }
- }
- byte[] serialize = KryoRegister.serialize(key);
- ctx.getChannel().write(serialize);
+ ByteBuf buffer = ctx.alloc().buffer();
+ key.writeData(buffer);
+ ctx.writeAndFlush(buffer).addListener(channelFutureListener);
} catch (Exception e) {
- LOGGER.error("Error while send request to server " + e.getMessage());
- ctx.getChannel().close();
+ LOGGER.error(e, "Error while send request to server ");
+ ctx.close();
}
- boolean interrupted = false;
try {
- for (; ; ) {
- try {
- dictionaryKey = dictKeyQueue.take();
- return dictionaryKey;
- } catch (InterruptedException ignore) {
- interrupted = true;
- }
+ dictionaryMessage = responseMsgQueue.poll(100, TimeUnit.SECONDS);
+ if (dictionaryMessage == null) {
+ throw new RuntimeException("Request timed out for key : " + key);
}
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
+ return dictionaryMessage;
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class DictionaryChannelFutureListener implements ChannelFutureListener {
+
+ private ChannelHandlerContext ctx;
+
+ DictionaryChannelFutureListener(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ LOGGER.error(future.cause(), "Error while sending request to Dictionary Server");
+ ctx.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
index e0cf611..b2b9863 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -21,29 +21,31 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
/**
* This is the dictionary generator for all tables. It generates dictionary
- * based on @{@link DictionaryKey}.
+ * based on @{@link DictionaryMessage}.
*/
-public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, DictionaryKey> {
+public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, DictionaryMessage> {
/**
* the map of tableName to TableDictionaryGenerator
*/
private Map<String, TableDictionaryGenerator> tableMap = new ConcurrentHashMap<>();
- @Override public Integer generateKey(DictionaryKey value) throws DictionaryGenerationException {
+ @Override
+ public Integer generateKey(DictionaryMessage value)
+ throws DictionaryGenerationException {
TableDictionaryGenerator generator = tableMap.get(value.getTableUniqueName());
assert generator != null : "Table initialization for generator is not done";
return generator.generateKey(value);
}
- public void initializeGeneratorForTable(DictionaryKey key) {
+ public void initializeGeneratorForTable(DictionaryMessage key) {
CarbonMetadata metadata = CarbonMetadata.getInstance();
CarbonTable carbonTable = metadata.getCarbonTable(key.getTableUniqueName());
CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
@@ -56,7 +58,7 @@ public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, D
}
}
- public Integer size(DictionaryKey key) {
+ public Integer size(DictionaryMessage key) {
TableDictionaryGenerator generator = tableMap.get(key.getTableUniqueName());
assert generator != null : "Table intialization for generator is not done";
return generator.size(key);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index 83ca399..add7811 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
* Dictionary generation for table.
*/
public class TableDictionaryGenerator
- implements DictionaryGenerator<Integer, DictionaryKey>, DictionaryWriter {
+ implements DictionaryGenerator<Integer, DictionaryMessage>, DictionaryWriter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(TableDictionaryGenerator.class.getName());
@@ -57,7 +57,9 @@ public class TableDictionaryGenerator
new IncrementalColumnDictionaryGenerator(dimension, 1));
}
- @Override public Integer generateKey(DictionaryKey value) throws DictionaryGenerationException {
+ @Override
+ public Integer generateKey(DictionaryMessage value)
+ throws DictionaryGenerationException {
CarbonMetadata metadata = CarbonMetadata.getInstance();
CarbonTable carbonTable = metadata.getCarbonTable(value.getTableUniqueName());
CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
@@ -65,10 +67,10 @@ public class TableDictionaryGenerator
DictionaryGenerator<Integer, String> generator =
columnMap.get(dimension.getColumnId());
- return generator.generateKey(value.getData().toString());
+ return generator.generateKey(value.getData());
}
- public Integer size(DictionaryKey key) {
+ public Integer size(DictionaryMessage key) {
CarbonMetadata metadata = CarbonMetadata.getInstance();
CarbonTable carbonTable = metadata.getCarbonTable(key.getTableUniqueName());
CarbonDimension dimension = carbonTable.getPrimitiveDimensionByName(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
deleted file mode 100644
index e1d04d1..0000000
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryKey.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.dictionary.generator.key;
-
-import java.io.Serializable;
-
-/**
- * Dictionary key to generate dictionary
- */
-public class DictionaryKey implements Serializable {
-
- /**
- * tableUniqueName
- */
- private String tableUniqueName;
-
- /**
- * columnName
- */
- private String columnName;
-
- /**
- * message data
- */
- private Object data;
-
- /**
- * message type
- */
- private String type;
-
- /**
- * dictionary client thread no
- */
- private String threadNo;
-
- public String getTableUniqueName() {
- return tableUniqueName;
- }
-
- public String getColumnName() {
- return columnName;
- }
-
- public Object getData() {
- return data;
- }
-
- public void setData(Object data) {
- this.data = data;
- }
-
- public void setThreadNo(String threadNo) {
- this.threadNo = threadNo;
- }
-
- public String getThreadNo() {
- return this.threadNo;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public void setTableUniqueName(String tableUniqueName) {
- this.tableUniqueName = tableUniqueName;
- }
-
- public void setColumnName(String columnName) {
- this.columnName = columnName;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
new file mode 100644
index 0000000..e7b14a2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessage.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator.key;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Dictionary key to generate dictionary
+ */
+public class DictionaryMessage {
+
+ /**
+ * tableUniqueName
+ */
+ private String tableUniqueName;
+
+ /**
+ * columnName
+ */
+ private String columnName;
+
+ /**
+ * message data
+ */
+ private String data;
+
+ /**
+ * Dictionary Value
+ */
+ private int dictionaryValue = CarbonCommonConstants.INVALID_SURROGATE_KEY;
+
+ /**
+ * message type
+ */
+ private DictionaryMessageType type;
+
+ public void readData(ByteBuf byteBuf) {
+ byte[] tableBytes = new byte[byteBuf.readInt()];
+ byteBuf.readBytes(tableBytes);
+ tableUniqueName = new String(tableBytes);
+
+ byte[] colBytes = new byte[byteBuf.readInt()];
+ byteBuf.readBytes(colBytes);
+ columnName = new String(colBytes);
+
+ byte typeByte = byteBuf.readByte();
+ type = getKeyType(typeByte);
+
+ byte dataType = byteBuf.readByte();
+ if (dataType == 0) {
+ dictionaryValue = byteBuf.readInt();
+ } else {
+ byte[] dataBytes = new byte[byteBuf.readInt()];
+ byteBuf.readBytes(dataBytes);
+ data = new String(dataBytes);
+ }
+ }
+
+ public void writeData(ByteBuf byteBuf) {
+ byte[] tableBytes = tableUniqueName.getBytes();
+ byteBuf.writeInt(tableBytes.length);
+ byteBuf.writeBytes(tableBytes);
+
+ byte[] colBytes = columnName.getBytes();
+ byteBuf.writeInt(colBytes.length);
+ byteBuf.writeBytes(colBytes);
+
+ byteBuf.writeByte(type.getType());
+
+ if (dictionaryValue > 0) {
+ byteBuf.writeByte(0);
+ byteBuf.writeInt(dictionaryValue);
+ } else {
+ byteBuf.writeByte(1);
+ byte[] dataBytes = data.getBytes();
+ byteBuf.writeInt(dataBytes.length);
+ byteBuf.writeBytes(dataBytes);
+ }
+ }
+
+ private DictionaryMessageType getKeyType(byte type) {
+ switch (type) {
+ case 1 :
+ return DictionaryMessageType.DICT_GENERATION;
+ case 2 :
+ return DictionaryMessageType.TABLE_INTIALIZATION;
+ case 3 :
+ return DictionaryMessageType.SIZE;
+ case 4 :
+ return DictionaryMessageType.WRITE_DICTIONARY;
+ default:
+ return DictionaryMessageType.DICT_GENERATION;
+ }
+ }
+
+ public String getTableUniqueName() {
+ return tableUniqueName;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+ public DictionaryMessageType getType() {
+ return type;
+ }
+
+ public void setType(DictionaryMessageType type) {
+ this.type = type;
+ }
+
+ public void setTableUniqueName(String tableUniqueName) {
+ this.tableUniqueName = tableUniqueName;
+ }
+
+ public void setColumnName(String columnName) {
+ this.columnName = columnName;
+ }
+
+ public int getDictionaryValue() {
+ return dictionaryValue;
+ }
+
+ public void setDictionaryValue(int dictionaryValue) {
+ this.dictionaryValue = dictionaryValue;
+ }
+
+ @Override public String toString() {
+ return "DictionaryKey{ columnName='"
+ + columnName + '\'' + ", data='" + data + '\'' + ", dictionaryValue=" + dictionaryValue
+ + ", type=" + type + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
new file mode 100644
index 0000000..608b602
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.dictionary.generator.key;
+
+/**
+ * Dictionary key types.
+ */
+public enum DictionaryMessageType {
+
+ DICT_GENERATION((byte) 1),
+ TABLE_INTIALIZATION((byte) 2),
+ SIZE((byte) 3),
+ WRITE_DICTIONARY((byte) 4);
+
+ final byte type;
+
+ DictionaryMessageType(byte type) {
+ this.type = type;
+ }
+
+ public byte getType() {
+ return type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
deleted file mode 100644
index 6beafd5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/KryoRegister.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.dictionary.generator.key;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Registration;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class KryoRegister {
- /**
- * deserialize byte to DictionaryKey
- *
- * @param bb
- * @return
- */
- public static DictionaryKey deserialize(byte[] bb) {
- Kryo kryo = new Kryo();
- kryo.setReferences(false);
- kryo.setRegistrationRequired(true);
- // register
- Registration registration = kryo.register(DictionaryKey.class);
-
- // deserialize
- Input input = null;
- input = new Input(bb);
- DictionaryKey key = (DictionaryKey) kryo.readObject(input, registration.getType());
- input.close();
- return key;
- }
-
- /**
- * serialize DictionaryKey to byte
- *
- * @param key
- * @return
- */
- public static byte[] serialize(DictionaryKey key) {
- Kryo kryo = new Kryo();
- kryo.setReferences(false);
- kryo.setRegistrationRequired(true);
- // register
- Registration registration = kryo.register(DictionaryKey.class);
- //serialize
- Output output = null;
- output = new Output(1, 4096);
- kryo.writeObject(output, key);
- byte[] bb = output.toBytes();
- output.flush();
- return bb;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index d667c93..38eda0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -16,22 +16,19 @@
*/
package org.apache.carbondata.core.dictionary.server;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.serialization.ClassResolvers;
-import org.jboss.netty.handler.codec.serialization.ObjectDecoder;
-import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
@@ -42,10 +39,11 @@ public class DictionaryServer {
private static final LogService LOGGER =
LogServiceFactory.getLogService(DictionaryServer.class.getName());
- private ServerBootstrap bootstrap;
-
private DictionaryServerHandler dictionaryServerHandler;
+ private EventLoopGroup boss;
+ private EventLoopGroup worker;
+
/**
* start dictionary server
*
@@ -53,27 +51,31 @@ public class DictionaryServer {
* @throws Exception
*/
public void startServer(int port) {
- bootstrap = new ServerBootstrap();
+ long start = System.currentTimeMillis();
dictionaryServerHandler = new DictionaryServerHandler();
+ boss = new NioEventLoopGroup();
+ worker = new NioEventLoopGroup();
+ // Configure the server.
+ try {
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(boss, worker);
+ bootstrap.channel(NioServerSocketChannel.class);
- ExecutorService boss = Executors.newCachedThreadPool();
- ExecutorService worker = Executors.newCachedThreadPool();
+ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
+ }
+ });
+ bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.bind(port).sync();
- bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
-
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("ObjectDecoder", new ObjectDecoder(ClassResolvers.cacheDisabled(
- getClass().getClassLoader())));
- pipeline.addLast("ObjectEncoder", new ObjectEncoder());
- pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
- return pipeline;
- }
- });
- bootstrap.bind(new InetSocketAddress(port));
- LOGGER.audit("Server Start!");
+ LOGGER.info("Dictionary Server started, Time spent " + (System.currentTimeMillis() - start)
+ + " Listening on port " + port);
+ } catch (Exception e) {
+ LOGGER.error(e, "Dictionary Server Start Failed");
+ throw new RuntimeException(e);
+ }
}
/**
@@ -82,10 +84,20 @@ public class DictionaryServer {
* @throws Exception
*/
public void shutdown() throws Exception {
- DictionaryKey key = new DictionaryKey();
- key.setType("WRITE_DICTIONARY");
+ worker.shutdownGracefully();
+ boss.shutdownGracefully();
+ // Wait until all threads are terminated.
+ boss.terminationFuture().sync();
+ worker.terminationFuture().sync();
+ }
+
+ /**
+ * Write dictionary to the store.
+ * @throws Exception
+ */
+ public void writeDictionary() throws Exception {
+ DictionaryMessage key = new DictionaryMessage();
+ key.setType(DictionaryMessageType.WRITE_DICTIONARY);
dictionaryServerHandler.processMessage(key);
- bootstrap.releaseExternalResources();
- bootstrap.shutdown();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
index a70566c..a14b675 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
@@ -19,19 +19,18 @@ package org.apache.carbondata.core.dictionary.server;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.dictionary.generator.ServerDictionaryGenerator;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
-import org.apache.carbondata.core.dictionary.generator.key.KryoRegister;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handler for Dictionary server.
*/
-public class DictionaryServerHandler extends SimpleChannelHandler {
+@ChannelHandler.Sharable
+public class DictionaryServerHandler extends ChannelInboundHandlerAdapter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(DictionaryServerHandler.class.getName());
@@ -42,45 +41,45 @@ public class DictionaryServerHandler extends SimpleChannelHandler {
private ServerDictionaryGenerator generatorForServer = new ServerDictionaryGenerator();
/**
- * channel connected
+ * channel registered
*
* @param ctx
- * @param e
* @throws Exception
*/
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOGGER.audit("Connected " + ctx.getHandler());
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ LOGGER.audit("Connected " + ctx);
+ super.channelActive(ctx);
}
- /**
- * receive message and handle
- *
- * @param ctx
- * @param e
- * @throws Exception
- */
- @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- byte[] request = (byte[]) e.getMessage();
- DictionaryKey key = KryoRegister.deserialize(request);
- int outPut = processMessage(key);
- key.setData(outPut);
- // Send back the response
- byte[] response = KryoRegister.serialize(key);
- ctx.getChannel().write(response);
- super.messageReceived(ctx, e);
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
+ ByteBuf data = (ByteBuf) msg;
+ DictionaryMessage key = new DictionaryMessage();
+ key.readData(data);
+ data.release();
+ int outPut = processMessage(key);
+ key.setDictionaryValue(outPut);
+ // Send back the response
+ ByteBuf buffer = ctx.alloc().buffer();
+ key.writeData(buffer);
+ ctx.writeAndFlush(buffer);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw e;
+ }
}
/**
* handle exceptions
*
* @param ctx
- * @param e
+ * @param cause
*/
- @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOGGER.error("exceptionCaught");
- e.getCause().printStackTrace();
- ctx.getChannel().close();
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOGGER.error(cause, "exceptionCaught");
+ ctx.close();
}
/**
@@ -90,16 +89,16 @@ public class DictionaryServerHandler extends SimpleChannelHandler {
* @return
* @throws Exception
*/
- public Integer processMessage(DictionaryKey key) throws Exception {
+ public int processMessage(DictionaryMessage key) throws Exception {
switch (key.getType()) {
- case "DICTIONARY_GENERATION":
+ case DICT_GENERATION :
return generatorForServer.generateKey(key);
- case "TABLE_INTIALIZATION":
+ case TABLE_INTIALIZATION :
generatorForServer.initializeGeneratorForTable(key);
return 0;
- case "SIZE":
+ case SIZE :
return generatorForServer.size(key);
- case "WRITE_DICTIONARY":
+ case WRITE_DICTIONARY :
generatorForServer.writeDictionaryData();
return 0;
default:
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 36dcab4..df6386b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -934,6 +934,7 @@ object CarbonDataRDDFactory {
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
+ shutDownDictionaryServer(carbonLoadModel, result, false)
throw new Exception(errorMessage)
} else {
val metadataDetails = status(0)._2
@@ -948,6 +949,7 @@ object CarbonDataRDDFactory {
.getTableName
}")
LOGGER.error("Dataload failed due to failure in table status updation.")
+ shutDownDictionaryServer(carbonLoadModel, result, false)
throw new Exception(errorMessage)
}
} else if (!carbonLoadModel.isRetentionRequest) {
@@ -955,17 +957,7 @@ object CarbonDataRDDFactory {
LOGGER.info("********Database updated**********")
}
- // write dictionary file and shutdown dictionary server
- if (carbonLoadModel.getUseOnePass) {
- try {
- result.get().shutdown()
- } catch {
- case ex: Exception =>
- LOGGER.error("Error while close dictionary server and write dictionary file for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- throw new Exception("Dataload failed due to error while write dictionary file!")
- }
- }
+ shutDownDictionaryServer(carbonLoadModel, result)
LOGGER.audit("Data load is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -982,4 +974,22 @@ object CarbonDataRDDFactory {
}
+ private def shutDownDictionaryServer(carbonLoadModel: CarbonLoadModel,
+ result: Future[DictionaryServer], writeDictionary: Boolean = true): Unit = {
+ // write dictionary file and shutdown dictionary server
+ if (carbonLoadModel.getUseOnePass) {
+ try {
+ val server = result.get()
+ if (writeDictionary) {
+ server.writeDictionary()
+ }
+ server.shutdown()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Error while close dictionary server and write dictionary file for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ throw new Exception("Dataload failed due to error while write dictionary file!")
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c7f22cc..771e455 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -871,8 +871,10 @@ object CarbonDataRDDFactory {
errorMessage = errorMessage + ": " + executorMessage
}
case _ =>
- executorMessage = ex.getCause.getMessage
- errorMessage = errorMessage + ": " + executorMessage
+ if (ex.getCause != null) {
+ executorMessage = ex.getCause.getMessage
+ errorMessage = errorMessage + ": " + executorMessage
+ }
}
LOGGER.info(errorMessage)
LOGGER.error(ex)
@@ -944,6 +946,7 @@ object CarbonDataRDDFactory {
LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
+ shutdownDictionaryServer(carbonLoadModel, result, false)
throw new Exception(errorMessage)
} else {
val metadataDetails = status(0)._2
@@ -958,6 +961,7 @@ object CarbonDataRDDFactory {
.getTableName
}")
LOGGER.error("Dataload failed due to failure in table status updation.")
+ shutdownDictionaryServer(carbonLoadModel, result, false)
throw new Exception(errorMessage)
}
} else if (!carbonLoadModel.isRetentionRequest) {
@@ -965,17 +969,7 @@ object CarbonDataRDDFactory {
LOGGER.info("********Database updated**********")
}
- // write dictionary file and shutdown dictionary server
- if (carbonLoadModel.getUseOnePass) {
- try {
- result.get().shutdown()
- } catch {
- case ex: Exception =>
- LOGGER.error("Error while close dictionary server and write dictionary file for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- throw new Exception("Dataload failed due to error while write dictionary file!")
- }
- }
+ shutdownDictionaryServer(carbonLoadModel, result)
LOGGER.audit("Data load is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
@@ -991,4 +985,22 @@ object CarbonDataRDDFactory {
}
+ private def shutdownDictionaryServer(carbonLoadModel: CarbonLoadModel,
+ result: Future[DictionaryServer], writeDictionary: Boolean = true) = {
+ // write dictionary file and shutdown dictionary server
+ if (carbonLoadModel.getUseOnePass) {
+ try {
+ val server = result.get()
+ if (writeDictionary) {
+ server.writeDictionary()
+ }
+ server.shutdown()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("Error while close dictionary server and write dictionary file for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ throw new Exception("Dataload failed due to error while write dictionary file!")
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 8c3b71e..105f5f4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -21,7 +21,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,7 +31,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -119,10 +119,9 @@ public class PrimitiveDataType implements GenericDataType<Object> {
* @param columnId
*/
public PrimitiveDataType(String name, String parentname, String columnId,
- CarbonDimension carbonDimension,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier,
- DictionaryClient client, Boolean useOnePass, String storePath) {
+ CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+ CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ String storePath, boolean tableInitialize, Map<Object, Integer> localCache) {
this.name = name;
this.parentname = parentname;
this.columnId = columnId;
@@ -140,20 +139,19 @@ public class PrimitiveDataType implements GenericDataType<Object> {
if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
dictionary = cache.get(identifier);
}
- String threadNo = "initial";
- DictionaryKey dictionaryKey = new DictionaryKey();
- dictionaryKey.setColumnName(carbonDimension.getColName());
- dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
- dictionaryKey.setThreadNo(threadNo);
+ DictionaryMessage dictionaryMessage = new DictionaryMessage();
+ dictionaryMessage.setColumnName(carbonDimension.getColName());
+ dictionaryMessage.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
// for table initialization
- dictionaryKey.setType("TABLE_INTIALIZATION");
- dictionaryKey.setData("0");
- client.getDictionary(dictionaryKey);
- Map<Object, Integer> localCache = new HashMap<>();
+ dictionaryMessage.setType(DictionaryMessageType.TABLE_INTIALIZATION);
+ dictionaryMessage.setData("0");
+ if (tableInitialize) {
+ client.getDictionary(dictionaryMessage);
+ }
// for generate dictionary
- dictionaryKey.setType("DICTIONARY_GENERATION");
+ dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
- dictionaryKey, localCache);
+ dictionaryMessage, localCache);
} else {
dictionary = cache.get(identifier);
dictionaryGenerator = new PreCreatedDictionary(dictionary);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
index 5c037b5..ae5cadb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -18,7 +18,6 @@
package org.apache.carbondata.processing.newflow.converter.impl;
import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,7 +28,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -51,11 +51,15 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
private String nullFormat;
+ private Dictionary dictionary;
+
+ private DictionaryMessage dictionaryMessage;
+
public DictionaryFieldConverterImpl(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
- DictionaryClient client, Boolean useOnePass, String storePath)
- throws IOException {
+ DictionaryClient client, boolean useOnePass, String storePath, boolean tableInitialize,
+ Map<Object, Integer> localCache) throws IOException {
this.index = index;
this.carbonDimension = (CarbonDimension) dataField.getColumn();
this.nullFormat = nullFormat;
@@ -63,26 +67,24 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
- Dictionary dictionary = null;
// if use one pass, use DictionaryServerClientDictionary
if (useOnePass) {
if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
dictionary = cache.get(identifier);
}
- String threadNo = "initial";
- DictionaryKey dictionaryKey = new DictionaryKey();
- dictionaryKey.setColumnName(dataField.getColumn().getColName());
- dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
- dictionaryKey.setThreadNo(threadNo);
+ dictionaryMessage = new DictionaryMessage();
+ dictionaryMessage.setColumnName(dataField.getColumn().getColName());
+ dictionaryMessage.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
// for table initialization
- dictionaryKey.setType("TABLE_INTIALIZATION");
- dictionaryKey.setData("0");
- client.getDictionary(dictionaryKey);
- Map<Object, Integer> localCache = new HashMap<>();
+ dictionaryMessage.setType(DictionaryMessageType.TABLE_INTIALIZATION);
+ dictionaryMessage.setData("0");
+ if (tableInitialize) {
+ client.getDictionary(dictionaryMessage);
+ }
// for generate dictionary
- dictionaryKey.setType("DICTIONARY_GENERATION");
+ dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
- dictionaryKey, localCache);
+ dictionaryMessage, localCache);
} else {
dictionary = cache.get(identifier);
dictionaryGenerator = new PreCreatedDictionary(dictionary);
@@ -107,4 +109,5 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
public void fillColumnCardinality(List<Integer> cardinality) {
cardinality.add(dictionaryGenerator.size());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index 903be7f..2a7ccec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.newflow.converter.impl;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
@@ -61,7 +62,8 @@ public class FieldEncoderFactory {
public FieldConverter createFieldEncoder(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
- DictionaryClient client, Boolean useOnePass, String storePath)
+ DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize,
+ Map<Object, Integer> localCache)
throws IOException {
// Converters are only needed for dimensions and measures it return null.
if (dataField.getColumn().isDimesion()) {
@@ -71,11 +73,11 @@ public class FieldEncoderFactory {
} else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
!dataField.getColumn().isComplex()) {
return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
- index, client, useOnePass, storePath);
+ index, client, useOnePass, storePath, tableInitialize, localCache);
} else if (dataField.getColumn().isComplex()) {
return new ComplexFieldConverterImpl(
createComplexType(dataField, cache, carbonTableIdentifier,
- client, useOnePass, storePath), index);
+ client, useOnePass, storePath, tableInitialize, localCache), index);
} else {
return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
}
@@ -89,10 +91,10 @@ public class FieldEncoderFactory {
*/
private static GenericDataType createComplexType(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier,
- DictionaryClient client, Boolean useOnePass, String storePath) {
+ CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ String storePath, boolean tableInitialize, Map<Object, Integer> localCache) {
return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath);
+ carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache);
}
/**
@@ -102,8 +104,8 @@ public class FieldEncoderFactory {
*/
private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
- CarbonTableIdentifier carbonTableIdentifier,
- DictionaryClient client, Boolean useOnePass, String storePath) {
+ CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+ String storePath, boolean tableInitialize, Map<Object, Integer> localCache) {
switch (carbonColumn.getDataType()) {
case ARRAY:
List<CarbonDimension> listOfChildDimensions =
@@ -113,7 +115,7 @@ public class FieldEncoderFactory {
new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
for (CarbonDimension dimension : listOfChildDimensions) {
arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath));
+ carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache));
}
return arrayDataType;
case STRUCT:
@@ -124,7 +126,7 @@ public class FieldEncoderFactory {
new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
for (CarbonDimension dimension : dimensions) {
structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
- carbonTableIdentifier, client, useOnePass, storePath));
+ carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache));
}
return structDataType;
case MAP:
@@ -132,7 +134,7 @@ public class FieldEncoderFactory {
default:
return new PrimitiveDataType(carbonColumn.getColName(), parentName,
carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
- carbonTableIdentifier, client, useOnePass, storePath);
+ carbonTableIdentifier, client, useOnePass, storePath, tableInitialize, localCache);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 52cf4c9..cdc69eb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -19,12 +19,16 @@ package org.apache.carbondata.processing.newflow.converter.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.Cache;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
@@ -48,6 +52,9 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
*/
public class RowConverterImpl implements RowConverter {
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RowConverterImpl.class.getName());
+
private CarbonDataLoadConfiguration configuration;
private DataField[] fields;
@@ -58,10 +65,14 @@ public class RowConverterImpl implements RowConverter {
private BadRecordLogHolder logHolder;
- private DictionaryClient dictClient;
+ private List<DictionaryClient> dictClients = new ArrayList<>();
private ExecutorService executorService;
+ private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+
+ private Map<Object, Integer>[] localCaches;
+
public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
BadRecordsLogger badRecordLogger) {
this.fields = fields;
@@ -72,19 +83,38 @@ public class RowConverterImpl implements RowConverter {
@Override
public void initialize() throws IOException {
CacheProvider cacheProvider = CacheProvider.getInstance();
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache =
- cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
- configuration.getTableIdentifier().getStorePath());
+ cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
+ configuration.getTableIdentifier().getStorePath());
String nullFormat =
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
.toString();
List<FieldConverter> fieldConverterList = new ArrayList<>();
-
+ localCaches = new Map[fields.length];
long lruCacheStartTime = System.currentTimeMillis();
+ DictionaryClient client = createDictionaryClient();
+ dictClients.add(client);
+ for (int i = 0; i < fields.length; i++) {
+ localCaches[i] = new ConcurrentHashMap<>();
+ FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
+ .createFieldEncoder(fields[i], cache,
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
+ client, configuration.getUseOnePass(),
+ configuration.getTableIdentifier().getStorePath(), true, localCaches[i]);
+ fieldConverterList.add(fieldConverter);
+ }
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+ .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
+ fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+ logHolder = new BadRecordLogHolder();
+ }
+
+ private DictionaryClient createDictionaryClient() {
// for one pass load, start the dictionary client
if (configuration.getUseOnePass()) {
- executorService = Executors.newFixedThreadPool(1);
+ if (executorService == null) {
+ executorService = Executors.newCachedThreadPool();
+ }
Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
@Override
public DictionaryClient call() throws Exception {
@@ -100,27 +130,17 @@ public class RowConverterImpl implements RowConverter {
// wait for client initialization finished, or will raise null pointer exception
Thread.sleep(1000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOGGER.error(e);
+ throw new RuntimeException(e);
}
try {
- dictClient = result.get();
+ return result.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
- for (int i = 0; i < fields.length; i++) {
- FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
- .createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
- dictClient, configuration.getUseOnePass(),
- configuration.getTableIdentifier().getStorePath());
- fieldConverterList.add(fieldConverter);
- }
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
- fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
- logHolder = new BadRecordLogHolder();
+ return null;
}
@Override
@@ -142,10 +162,12 @@ public class RowConverterImpl implements RowConverter {
@Override
public void finish() {
List<Integer> dimCardinality = new ArrayList<>();
- for (int i = 0; i < fieldConverters.length; i++) {
- if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
- ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
- .fillColumnCardinality(dimCardinality);
+ if (fieldConverters != null) {
+ for (int i = 0; i < fieldConverters.length; i++) {
+ if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
+ ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+ .fillColumnCardinality(dimCardinality);
+ }
}
}
int[] cardinality = new int[dimCardinality.size()];
@@ -157,7 +179,11 @@ public class RowConverterImpl implements RowConverter {
// close dictionary client when finish write
if (configuration.getUseOnePass()) {
- dictClient.shutDown();
+ for (DictionaryClient client : dictClients) {
+ if (client != null) {
+ client.shutDown();
+ }
+ }
executorService.shutdownNow();
}
}
@@ -166,7 +192,27 @@ public class RowConverterImpl implements RowConverter {
public RowConverter createCopyForNewThread() {
RowConverterImpl converter =
new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
- converter.fieldConverters = fieldConverters;
+ List<FieldConverter> fieldConverterList = new ArrayList<>();
+ DictionaryClient client = createDictionaryClient();
+ dictClients.add(client);
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
+ for (int i = 0; i < fields.length; i++) {
+ FieldConverter fieldConverter = null;
+ try {
+ fieldConverter = FieldEncoderFactory.getInstance()
+ .createFieldEncoder(fields[i], cache,
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
+ client, configuration.getUseOnePass(),
+ configuration.getTableIdentifier().getStorePath(), false, localCaches[i]);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ fieldConverterList.add(fieldConverter);
+ }
+ converter.fieldConverters =
+ fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
converter.logHolder = new BadRecordLogHolder();
return converter;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
index 6cfc9a0..c7ff1c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
@@ -23,7 +23,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.devapi.BiDictionary;
import org.apache.carbondata.core.devapi.DictionaryGenerationException;
import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
/**
* Dictionary implementation along with dictionary server client to get new dictionary values
@@ -36,17 +37,15 @@ public class DictionaryServerClientDictionary implements BiDictionary<Integer, O
private Map<Object, Integer> localCache;
- private DictionaryKey dictionaryKey;
+ private DictionaryMessage dictionaryMessage;
private int base;
- private Object lock = new Object();
-
public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
- DictionaryKey key, Map<Object, Integer> localCache) {
+ DictionaryMessage key, Map<Object, Integer> localCache) {
this.dictionary = dictionary;
this.client = client;
- this.dictionaryKey = key;
+ this.dictionaryMessage = key;
this.localCache = localCache;
this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
}
@@ -54,11 +53,10 @@ public class DictionaryServerClientDictionary implements BiDictionary<Integer, O
@Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
Integer key = getKey(value);
if (key == null) {
- synchronized (lock) {
- dictionaryKey.setData(value);
- dictionaryKey.setThreadNo(Thread.currentThread().getId() + "");
- DictionaryKey dictionaryValue = client.getDictionary(dictionaryKey);
- key = (Integer) dictionaryValue.getData();
+ dictionaryMessage.setData(value.toString());
+ DictionaryMessage dictionaryValue = client.getDictionary(dictionaryMessage);
+ key = dictionaryValue.getDictionaryValue();
+ synchronized (localCache) {
localCache.put(value, key);
}
return key + base;
@@ -85,9 +83,8 @@ public class DictionaryServerClientDictionary implements BiDictionary<Integer, O
}
@Override public int size() {
- dictionaryKey.setType("SIZE");
- int size = (int) client.getDictionary(dictionaryKey).getData()
- + base;
+ dictionaryMessage.setType(DictionaryMessageType.SIZE);
+ int size = client.getDictionary(dictionaryMessage).getDictionaryValue() + base;
return size;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 275f017..ebc659e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -19,7 +19,9 @@ package org.apache.carbondata.processing.newflow.steps;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -42,7 +44,7 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
*/
public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
- private RowConverter converter;
+ private List<RowConverter> converters;
public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
@@ -57,8 +59,11 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
@Override
public void initialize() throws IOException {
child.initialize();
+ converters = new ArrayList<>();
BadRecordsLogger badRecordLogger = createBadRecordLogger();
- converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ RowConverter converter =
+ new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ converters.add(converter);
converter.initialize();
}
@@ -71,8 +76,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
@Override
protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
return new CarbonIterator<CarbonRowBatch>() {
- RowConverter localConverter = converter.createCopyForNewThread();
+ private boolean first = true;
+ private RowConverter localConverter;
@Override public boolean hasNext() {
+ if (first) {
+ first = false;
+ localConverter = converters.get(0).createCopyForNewThread();
+ }
return childIter.hasNext();
}
@@ -154,8 +164,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
public void close() {
if (!closed) {
super.close();
- if (converter != null) {
- converter.finish();
+ if (converters != null) {
+ for (RowConverter converter : converters) {
+ converter.finish();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index fef4aaa..af66ad7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -48,7 +48,7 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
*/
public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
- private RowConverter converter;
+ private List<RowConverter> converters;
private Partitioner<Object[]> partitioner;
@@ -65,8 +65,11 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
@Override
public void initialize() throws IOException {
child.initialize();
+ converters = new ArrayList<>();
BadRecordsLogger badRecordLogger = createBadRecordLogger();
- converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ RowConverter converter =
+ new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+ converters.add(converter);
converter.initialize();
List<Integer> indexes = new ArrayList<>();
List<ColumnSchema> columnSchemas = new ArrayList<>();
@@ -95,8 +98,14 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
@Override
protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
return new CarbonIterator<CarbonRowBatch>() {
- RowConverter localConverter = converter.createCopyForNewThread();
+ RowConverter localConverter;
+ private boolean first = true;
@Override public boolean hasNext() {
+ if (first) {
+ first = false;
+ localConverter = converters.get(0).createCopyForNewThread();
+ converters.add(localConverter);
+ }
return childIter.hasNext();
}
@@ -181,8 +190,10 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
public void close() {
if (!closed) {
super.close();
- if (converter != null) {
- converter.finish();
+ if (converters != null) {
+ for (RowConverter converter : converters) {
+ converter.finish();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index bd4b0e6..aca47b6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -87,7 +87,9 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
public void close() {
if (!closed) {
super.close();
- sorter.close();
+ if (sorter != null) {
+ sorter.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/87dade7a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index dc40efe..f618965 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -386,7 +386,7 @@ public class SortParameters {
} catch (NumberFormatException exc) {
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
}
- parameters.setNumberOfCores(numberOfCores);
+ parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
.getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
@@ -489,7 +489,7 @@ public class SortParameters {
} catch (NumberFormatException exc) {
numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
}
- parameters.setNumberOfCores(numberOfCores);
+ parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1);
parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties
.getProperty(CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE,
[2/2] incubator-carbondata git commit: [CARBONDATA-715] Optimize
Single-Pass data load flow This closes #605
Posted by ja...@apache.org.
[CARBONDATA-715] Optimize Single-Pass data load flow This closes #605
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bb0bc2ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bb0bc2ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bb0bc2ee
Branch: refs/heads/master
Commit: bb0bc2ee22f7a9b86c900171e6c9bcf60a1601b1
Parents: 3e36cdf 87dade7
Author: jackylk <ja...@huawei.com>
Authored: Fri Feb 24 13:42:01 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Feb 24 13:42:01 2017 +0800
----------------------------------------------------------------------
core/pom.xml | 5 +
.../dictionary/client/DictionaryClient.java | 58 ++++---
.../client/DictionaryClientHandler.java | 112 +++++++------
.../generator/ServerDictionaryGenerator.java | 14 +-
.../generator/TableDictionaryGenerator.java | 12 +-
.../dictionary/generator/key/DictionaryKey.java | 90 -----------
.../generator/key/DictionaryMessage.java | 157 +++++++++++++++++++
.../generator/key/DictionaryMessageType.java | 38 +++++
.../dictionary/generator/key/KryoRegister.java | 66 --------
.../dictionary/server/DictionaryServer.java | 86 +++++-----
.../server/DictionaryServerHandler.java | 77 +++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 32 ++--
.../spark/rdd/CarbonDataRDDFactory.scala | 38 +++--
.../processing/datatypes/PrimitiveDataType.java | 32 ++--
.../impl/DictionaryFieldConverterImpl.java | 35 +++--
.../converter/impl/FieldEncoderFactory.java | 24 +--
.../converter/impl/RowConverterImpl.java | 98 +++++++++---
.../DictionaryServerClientDictionary.java | 25 ++-
.../steps/DataConverterProcessorStepImpl.java | 22 ++-
...ConverterProcessorWithBucketingStepImpl.java | 21 ++-
.../newflow/steps/SortProcessorStepImpl.java | 4 +-
.../sortandgroupby/sortdata/SortParameters.java | 4 +-
22 files changed, 603 insertions(+), 447 deletions(-)
----------------------------------------------------------------------