You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/10/01 01:43:26 UTC
[10/20] carbondata git commit: [CARBONDATA-1530] Clean up
carbon-processing module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
deleted file mode 100644
index 82605a5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-
-/**
- * Dictionary implementation along with dictionary server client to get new dictionary values
- */
-public class DictionaryServerClientDictionary implements BiDictionary<Integer, Object> {
-
- private Dictionary dictionary;
-
- private DictionaryClient client;
-
- private Map<Object, Integer> localCache;
-
- private DictionaryMessage dictionaryMessage;
-
- private int base;
-
- public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
- DictionaryMessage key, Map<Object, Integer> localCache) {
- this.dictionary = dictionary;
- this.client = client;
- this.dictionaryMessage = key;
- this.localCache = localCache;
- this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
- }
-
- @Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
- Integer key = getKey(value);
- if (key == null) {
- dictionaryMessage.setData(value.toString());
- DictionaryMessage dictionaryValue = client.getDictionary(dictionaryMessage);
- key = dictionaryValue.getDictionaryValue();
- synchronized (localCache) {
- localCache.put(value, key);
- }
- return key + base;
- }
- return key;
- }
-
- @Override public Integer getKey(Object value) {
- Integer key = -1;
- if (dictionary != null) {
- key = dictionary.getSurrogateKey(value.toString());
- }
- if (key == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
- key = localCache.get(value);
- if (key != null) {
- return key + base;
- }
- }
- return key;
- }
-
- @Override public Object getValue(Integer key) {
- throw new UnsupportedOperationException("Not supported here");
- }
-
- @Override public int size() {
- dictionaryMessage.setType(DictionaryMessageType.SIZE);
- return client.getDictionary(dictionaryMessage).getDictionaryValue() + base;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
deleted file mode 100644
index e6cd42f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-
-/**
- * It is used for generating dictionary from value itself, like timestamp can be used directly as
- * dictionary.
- */
-public class DirectDictionary implements BiDictionary<Integer, Object> {
-
- private DirectDictionaryGenerator dictionaryGenerator;
-
- public DirectDictionary(DirectDictionaryGenerator dictionaryGenerator) {
- this.dictionaryGenerator = dictionaryGenerator;
- }
-
- @Override
- public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
- Integer key = getKey(value);
- if (key == null) {
- throw new UnsupportedOperationException("trying to add new entry in DirectDictionary");
- }
- return key;
- }
-
- @Override
- public Integer getKey(Object value) {
- return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
- }
-
- @Override
- public Object getValue(Integer key) {
- return dictionaryGenerator.getValueFromSurrogate(key);
- }
-
- @Override public int size() {
- return Integer.MAX_VALUE;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
deleted file mode 100644
index 7b6d5f1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import java.util.Map;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.devapi.GeneratingBiDictionary;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
-public class InMemBiDictionary<K, V> extends GeneratingBiDictionary<K, V> {
-
- private BiMap<K, V> biMap;
-
- /**
- * Constructor to create a new dictionary, dictionary key will be generated by specified generator
- * @param generator
- */
- public InMemBiDictionary(DictionaryGenerator generator) {
- super(generator);
- biMap = HashBiMap.create();
- }
-
- /**
- * Constructor to create a pre-created dictionary
- * @param preCreatedDictionary
- */
- public InMemBiDictionary(Map<K, V> preCreatedDictionary) {
- super(new DictionaryGenerator<K, V>() {
- @Override
- public K generateKey(V value) throws DictionaryGenerationException {
- // Since dictionary is provided by preCreated, normally it should not come here
- throw new DictionaryGenerationException(
- "encounter new dictionary value in pre-created dictionary:" + value);
- }
- });
- biMap = HashBiMap.create(preCreatedDictionary);
- }
-
- @Override
- public K getKey(V value) {
- return biMap.inverse().get(value);
- }
-
- @Override
- public V getValue(K key) {
- return biMap.get(key);
- }
-
- @Override
- protected void put(K key, V value) {
- // dictionary is immutable, it is append only
- assert (!biMap.containsKey(key));
- assert (!biMap.containsValue(value));
- biMap.put(key, value);
- }
-
- @Override
- public int size() {
- return biMap.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
deleted file mode 100644
index 19b1cf3..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-
-public class PreCreatedDictionary implements BiDictionary<Integer, Object> {
-
- private Dictionary dictionary;
-
- public PreCreatedDictionary(Dictionary dictionary) {
- this.dictionary = dictionary;
- }
-
- @Override
- public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
- Integer key = getKey(value);
- if (key == null) {
- throw new UnsupportedOperationException("trying to add new entry in PreCreatedDictionary");
- }
- return key;
- }
-
- @Override
- public Integer getKey(Object value) {
- return dictionary.getSurrogateKey(value.toString());
- }
-
- @Override
- public String getValue(Integer key) {
- return dictionary.getDictionaryValueForKey(key);
- }
-
- @Override
- public int size() {
- return dictionary.getDictionaryChunks().getSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
deleted file mode 100644
index eb95528..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.exception;
-
-public class BadRecordFoundException extends CarbonDataLoadingException {
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public BadRecordFoundException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public BadRecordFoundException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param t
- */
- public BadRecordFoundException(Throwable t) {
- super(t);
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
deleted file mode 100644
index 6ffdd03..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.exception;
-
-public class CarbonDataLoadingException extends RuntimeException {
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public CarbonDataLoadingException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public CarbonDataLoadingException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param t
- */
- public CarbonDataLoadingException(Throwable t) {
- super(t);
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
deleted file mode 100644
index 027b2d0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.exception;
-
-public class NoRetryException extends RuntimeException {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public NoRetryException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public NoRetryException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param t
- */
- public NoRetryException(Throwable t) {
- super(t);
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
deleted file mode 100644
index c37e782..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.processing.newflow.parser.impl.ArrayParserImpl;
-import org.apache.carbondata.processing.newflow.parser.impl.PrimitiveParserImpl;
-import org.apache.carbondata.processing.newflow.parser.impl.StructParserImpl;
-
-public final class CarbonParserFactory {
-
- /**
- * Create parser for the carbon column.
- *
- * @param carbonColumn
- * @param complexDelimiters
- * @return
- */
- public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
- String nullFormat) {
- return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
- }
-
- /**
- * This method may be called recursively if the carbon column is complex type.
- *
- * @param carbonColumn
- * @param complexDelimiters, these delimiters which are used to separate the complex data types.
- * @param depth It is like depth of tree, if column has children then depth is 1,
- * And depth becomes 2 if children has children.
- * This depth is used select the complex
- * delimiters
- * @return GenericParser
- */
- private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
- String nullFormat, int depth) {
- switch (carbonColumn.getDataType()) {
- case ARRAY:
- List<CarbonDimension> listOfChildDimensions =
- ((CarbonDimension) carbonColumn).getListOfChildDimensions();
- // Create array parser with complex delimiter
- ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
- for (CarbonDimension dimension : listOfChildDimensions) {
- arrayParser
- .addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
- }
- return arrayParser;
- case STRUCT:
- List<CarbonDimension> dimensions =
- ((CarbonDimension) carbonColumn).getListOfChildDimensions();
- // Create struct parser with complex delimiter
- StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
- for (CarbonDimension dimension : dimensions) {
- parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
- }
- return parser;
- case MAP:
- throw new UnsupportedOperationException("Complex type Map is not supported yet");
- default:
- return new PrimitiveParserImpl();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java
deleted file mode 100644
index 60247a1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-/**
- * It parses data string as per complex data type.
- */
-public interface ComplexParser<E> extends GenericParser<E> {
-
- /**
- * Children to this parser.
- * @param parser
- */
- void addChildren(GenericParser parser);
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
deleted file mode 100644
index b745bed..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-/**
- * Parse the data according to implementation, The implementation classes can be struct, array or
- * map datatypes.
- * It remains thread safe as the state of implementation class should not change while
- * calling @{@link GenericParser#parse(Object)} method
- */
-public interface GenericParser<E> {
-
- /**
- * Parse the data as per the delimiter
- * @param data
- * @return
- */
- E parse(Object data);
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java
deleted file mode 100644
index 9795e90..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-/**
- * Parse the complete row at once.
- */
-public interface RowParser {
-
- /**
- * Parse row.
- * @param row input row to be parsed.
- * @return parsed row.
- */
- Object[] parseRow(Object[] row);
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
deleted file mode 100644
index 11bbc78..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import java.util.regex.Pattern;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject;
-import org.apache.carbondata.processing.newflow.parser.ComplexParser;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-
-import org.apache.commons.lang.ArrayUtils;
-
-/**
- * It parses the string to @{@link ArrayObject} using delimiter.
- * It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(Object)} method
- */
-public class ArrayParserImpl implements ComplexParser<ArrayObject> {
-
- private Pattern pattern;
-
- private GenericParser child;
-
- private String nullFormat;
-
- public ArrayParserImpl(String delimiter, String nullFormat) {
- pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
- this.nullFormat = nullFormat;
- }
-
- @Override
- public ArrayObject parse(Object data) {
- if (data != null) {
- String value = data.toString();
- if (!value.isEmpty() && !value.equals(nullFormat)) {
- String[] split = pattern.split(value, -1);
- if (ArrayUtils.isNotEmpty(split)) {
- Object[] array = new Object[split.length];
- for (int i = 0; i < split.length; i++) {
- array[i] = child.parse(split[i]);
- }
- return new ArrayObject(array);
- }
- }
- }
- return null;
- }
-
- @Override
- public void addChildren(GenericParser parser) {
- child = parser;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
deleted file mode 100644
index 2cf5633..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-
-public class PrimitiveParserImpl implements GenericParser<Object> {
-
- @Override
- public Object parse(Object data) {
- return data;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
deleted file mode 100644
index 61e4a31..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-import org.apache.carbondata.processing.newflow.parser.RowParser;
-
-public class RowParserImpl implements RowParser {
-
- private GenericParser[] genericParsers;
-
- private int[] outputMapping;
-
- private int[] inputMapping;
-
- private int numberOfColumns;
-
- public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
- String[] complexDelimiters =
- (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
- String nullFormat =
- configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
- .toString();
- DataField[] input = getInput(configuration);
- genericParsers = new GenericParser[input.length];
- for (int i = 0; i < genericParsers.length; i++) {
- genericParsers[i] =
- CarbonParserFactory.createParser(input[i].getColumn(), complexDelimiters, nullFormat);
- }
- outputMapping = new int[output.length];
- for (int i = 0; i < input.length; i++) {
- for (int j = 0; j < output.length; j++) {
- if (input[i].getColumn().equals(output[j].getColumn())) {
- outputMapping[i] = j;
- break;
- }
- }
- }
- }
-
- public DataField[] getInput(CarbonDataLoadConfiguration configuration) {
- DataField[] fields = configuration.getDataFields();
- String[] header = configuration.getHeader();
- numberOfColumns = header.length;
- DataField[] input = new DataField[fields.length];
- inputMapping = new int[input.length];
- int k = 0;
- for (int i = 0; i < fields.length; i++) {
- for (int j = 0; j < numberOfColumns; j++) {
- if (header[j].equalsIgnoreCase(fields[i].getColumn().getColName())) {
- input[k] = fields[i];
- inputMapping[k] = j;
- k++;
- break;
- }
- }
- }
- return input;
- }
-
- @Override
- public Object[] parseRow(Object[] row) {
- // If number of columns are less in a row then create new array with same size of header.
- if (row.length < numberOfColumns) {
- String[] temp = new String[numberOfColumns];
- System.arraycopy(row, 0, temp, 0, row.length);
- row = temp;
- }
- Object[] out = new Object[genericParsers.length];
- for (int i = 0; i < genericParsers.length; i++) {
- Object obj = row[inputMapping[i]];
- out[outputMapping[i]] = genericParsers[i].parse(obj);
- }
- return out;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
deleted file mode 100644
index 3969d9a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.complexobjects.StructObject;
-import org.apache.carbondata.processing.newflow.parser.ComplexParser;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-
-import org.apache.commons.lang.ArrayUtils;
-
-/**
- * It parses the string to @{@link StructObject} using delimiter.
- * It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(Object)} method
- */
-public class StructParserImpl implements ComplexParser<StructObject> {
-
- private Pattern pattern;
-
- private List<GenericParser> children = new ArrayList<>();
-
- private String nullFormat;
-
- public StructParserImpl(String delimiter, String nullFormat) {
- pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
- this.nullFormat = nullFormat;
- }
-
- @Override
- public StructObject parse(Object data) {
- if (data != null) {
- String value = data.toString();
- if (!value.isEmpty() && !value.equals(nullFormat)) {
- String[] split = pattern.split(value, -1);
- if (ArrayUtils.isNotEmpty(split)) {
- Object[] array = new Object[children.size()];
- for (int i = 0; i < split.length && i < array.length; i++) {
- array[i] = children.get(i).parse(split[i]);
- }
- return new StructObject(array);
- }
- }
- }
- return null;
- }
-
- @Override
- public void addChildren(GenericParser parser) {
- children.add(parser);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java
deleted file mode 100644
index 85de593..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.partition;
-
-/**
- * Partitions the data as per key
- */
-public interface Partitioner<Key> {
-
- int getPartition(Key key);
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java
deleted file mode 100644
index 42f48f0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.partition.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.processing.newflow.partition.Partitioner;
-
-/**
- * Hash partitioner implementation
- */
-public class HashPartitionerImpl implements Partitioner<Object[]> {
-
- private int numberOfBuckets;
-
- private Hash[] hashes;
-
- public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas,
- int numberOfBuckets) {
- this.numberOfBuckets = numberOfBuckets;
- hashes = new Hash[indexes.size()];
- for (int i = 0; i < indexes.size(); i++) {
- switch (columnSchemas.get(i).getDataType()) {
- case SHORT:
- case INT:
- case LONG:
- hashes[i] = new IntegralHash(indexes.get(i));
- break;
- case DOUBLE:
- case FLOAT:
- case DECIMAL:
- hashes[i] = new DecimalHash(indexes.get(i));
- break;
- default:
- hashes[i] = new StringHash(indexes.get(i));
- }
- }
- }
-
- @Override public int getPartition(Object[] objects) {
- int hashCode = 0;
- for (Hash hash : hashes) {
- hashCode += hash.getHash(objects);
- }
- return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
- }
-
- private interface Hash {
- int getHash(Object[] value);
- }
-
- private static class IntegralHash implements Hash {
-
- private int index;
-
- private IntegralHash(int index) {
- this.index = index;
- }
-
- public int getHash(Object[] value) {
- return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0;
- }
- }
-
- private static class DecimalHash implements Hash {
-
- private int index;
-
- private DecimalHash(int index) {
- this.index = index;
- }
-
- public int getHash(Object[] value) {
- return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0;
- }
- }
-
- private static class StringHash implements Hash {
-
- private int index;
-
- private StringHash(int index) {
- this.index = index;
- }
-
- @Override public int getHash(Object[] value) {
- return value[index] != null ? value[index].hashCode() : 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
deleted file mode 100644
index 1de55e0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.row;
-
-import java.util.NoSuchElementException;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-
-
-/**
- * Batch of rows.
- */
-public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
-
- private CarbonRow[] rowBatch;
-
- private int size = 0;
-
- private int index = 0;
-
- public CarbonRowBatch(int batchSize) {
- this.rowBatch = new CarbonRow[batchSize];
- }
-
- public void addRow(CarbonRow carbonRow) {
- rowBatch[size++] = carbonRow;
- }
-
- public int getSize() {
- return size;
- }
-
- @Override public boolean hasNext() {
- return index < size;
- }
-
- @Override
- public CarbonRow next() throws NoSuchElementException {
- if (hasNext()) {
- return rowBatch[index++];
- }
- throw new NoSuchElementException("no more elements to iterate");
- }
-
- @Override public void remove() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
deleted file mode 100644
index ba96a96..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.row;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-
-/**
- * Batch of sorted rows which are ready to be processed by
- */
-public class CarbonSortBatch extends CarbonRowBatch {
-
- private UnsafeSingleThreadFinalSortFilesMerger iterator;
-
- public CarbonSortBatch(UnsafeSingleThreadFinalSortFilesMerger iterator) {
- super(0);
- this.iterator = iterator;
- }
-
- @Override public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override public CarbonRow next() {
- return new CarbonRow(iterator.next());
- }
-
- @Override public void close() {
- iterator.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
deleted file mode 100644
index 5179baa..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort;
-
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.sort.impl.ThreadStatusObserver;
-
-/**
- * The class defines the common methods used in across various type of sort
- */
-public abstract class AbstractMergeSorter implements Sorter {
- /**
- * instance of thread status observer
- */
- protected ThreadStatusObserver threadStatusObserver;
-
- /**
- * Below method will be used to check error in exception
- */
- public void checkError() {
- if (threadStatusObserver.getThrowable() != null) {
- if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) {
- throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
- } else {
- throw new CarbonDataLoadingException(threadStatusObserver.getThrowable());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
deleted file mode 100644
index 2bf8e16..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Sort scope options
- */
-public class SortScopeOptions {
-
- public static SortScope getSortScope(String sortScope) {
- if (sortScope == null) {
- sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
- }
- switch (sortScope.toUpperCase()) {
- case "BATCH_SORT":
- return SortScope.BATCH_SORT;
- case "LOCAL_SORT":
- return SortScope.LOCAL_SORT;
- case "GLOBAL_SORT":
- return SortScope.GLOBAL_SORT;
- case "NO_SORT":
- return SortScope.NO_SORT;
- default:
- return SortScope.LOCAL_SORT;
- }
- }
-
- public static boolean isValidSortOption(String sortScope) {
- return CarbonUtil.isValidSortOption(sortScope);
- }
-
- public enum SortScope {
- NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
deleted file mode 100644
index 62434bc..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-public class SortStepRowUtil {
- public static Object[] convertRow(Object[] data, SortParameters parameters) {
- int measureCount = parameters.getMeasureColCount();
- int dimensionCount = parameters.getDimColCount();
- int complexDimensionCount = parameters.getComplexDimColCount();
- int noDictionaryCount = parameters.getNoDictionaryCount();
- boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-
- // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
- Object[] holder = new Object[3];
- int index = 0;
- int nonDicIndex = 0;
- int allCount = 0;
- int[] dim = new int[dimensionCount];
- byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
- Object[] measures = new Object[measureCount];
- try {
- // read dimension values
- for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
- if (isNoDictionaryDimensionColumn[i]) {
- nonDicArray[nonDicIndex++] = (byte[]) data[i];
- } else {
- dim[index++] = (int) data[allCount];
- }
- allCount++;
- }
-
- for (int i = 0; i < complexDimensionCount; i++) {
- nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
- allCount++;
- }
-
- index = 0;
-
- // read measure values
- for (int i = 0; i < measureCount; i++) {
- measures[index++] = data[allCount];
- allCount++;
- }
-
- NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
- // increment number if record read
- } catch (Exception e) {
- throw new RuntimeException("Problem while converting row ", e);
- }
-
- //return out row
- return holder;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
deleted file mode 100644
index 4a2f5f4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort;
-
-import java.util.Iterator;
-
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * This interface sorts all the data of iterators.
- * The life cycle of this interface is initialize -> sort -> close
- */
-public interface Sorter {
-
- /**
- * Initialize sorter with sort parameters.
- *
- * @param sortParameters
- */
- void initialize(SortParameters sortParameters);
-
- /**
- * Sorts the data of all iterators, this iterators can be
- * read parallely depends on implementation.
- *
- * @param iterators array of iterators to read data.
- * @return
- * @throws CarbonDataLoadingException
- */
- Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException;
-
- /**
- * Close resources
- */
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
deleted file mode 100644
index 39a21ad..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class SorterFactory {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SorterFactory.class.getName());
-
- public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
- boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
- SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
- Sorter sorter;
- if (offheapsort) {
- if (configuration.getBucketingInfo() != null) {
- sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
- configuration.getBucketingInfo());
- } else {
- sorter = new UnsafeParallelReadMergeSorterImpl(counter);
- }
- } else {
- if (configuration.getBucketingInfo() != null) {
- sorter =
- new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
- } else {
- sorter = new ParallelReadMergeSorterImpl(counter);
- }
- }
- if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
- if (configuration.getBucketingInfo() == null) {
- sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
- } else {
- LOGGER.warn(
- "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
- .getName());
- }
- }
- return sorter;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
deleted file mode 100644
index 5a8a2c8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- */
-public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private SortIntermediateFileMerger intermediateFileMerger;
-
- private SingleThreadFinalSortFilesMerger finalMerger;
-
- private AtomicLong rowCounter;
-
- public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
- this.rowCounter = rowCounter;
- }
-
- @Override
- public void initialize(SortParameters sortParameters) {
- this.sortParameters = sortParameters;
- intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
- String[] storeLocations =
- CarbonDataProcessorUtil.getLocalDataFolderLocation(
- sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
- sortParameters.getSegmentId() + "", false, false);
- // Set the data file location
- String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
- File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- finalMerger =
- new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(),
- sortParameters.getDimColCount(),
- sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
- sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
- sortParameters.getNoDictionaryDimnesionColumn(),
- sortParameters.getNoDictionarySortColumn());
- }
-
- @Override
- public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException {
- SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
- final int batchSize = CarbonProperties.getInstance().getBatchSize();
- try {
- sortDataRow.initialize();
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
- this.threadStatusObserver = new ThreadStatusObserver(executorService);
-
- try {
- for (int i = 0; i < iterators.length; i++) {
- executorService.execute(
- new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
- threadStatusObserver));
- }
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- processRowToNextStep(sortDataRow, sortParameters);
- } catch (Exception e) {
- checkError();
- throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
- }
- checkError();
- try {
- intermediateFileMerger.finish();
- intermediateFileMerger = null;
- finalMerger.startFinalMerge();
- } catch (CarbonDataWriterException e) {
- throw new CarbonDataLoadingException(e);
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
-
- // Creates the iterator to read from merge sorter.
- Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
-
- @Override
- public boolean hasNext() {
- return finalMerger.hasNext();
- }
-
- @Override
- public CarbonRowBatch next() {
- int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
- while (finalMerger.hasNext() && counter < batchSize) {
- rowBatch.addRow(new CarbonRow(finalMerger.next()));
- counter++;
- }
- return rowBatch;
- }
- };
- return new Iterator[] { batchIterator };
- }
-
- @Override public void close() {
- if (intermediateFileMerger != null) {
- intermediateFileMerger.close();
- }
- }
-
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
- throws CarbonDataLoadingException {
- if (null == sortDataRows) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- // start sorting
- sortDataRows.startSorting();
-
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
- System.currentTimeMillis());
- return false;
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- /**
- * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
- */
- private static class SortIteratorThread implements Runnable {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private SortDataRows sortDataRows;
-
- private Object[][] buffer;
-
- private AtomicLong rowCounter;
-
- private ThreadStatusObserver observer;
-
- public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows,
- int batchSize, AtomicLong rowCounter, ThreadStatusObserver observer) {
- this.iterator = iterator;
- this.sortDataRows = sortDataRows;
- this.buffer = new Object[batchSize][];
- this.rowCounter = rowCounter;
- this.observer = observer;
-
- }
-
- @Override
- public void run() {
- try {
- while (iterator.hasNext()) {
- CarbonRowBatch batch = iterator.next();
- int i = 0;
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- if (row != null) {
- buffer[i++] = row.getData();
- }
- }
- if (i > 0) {
- sortDataRows.addRowBatch(buffer, i);
- rowCounter.getAndAdd(i);
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- observer.notifyFailed(e);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index 7e013e0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(ParallelReadMergeSorterWithBucketingImpl.class.getName());
-
- private SortParameters sortParameters;
-
- private SortIntermediateFileMerger[] intermediateFileMergers;
-
- private BucketingInfo bucketingInfo;
-
- private int sortBufferSize;
-
- private AtomicLong rowCounter;
-
- public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
- BucketingInfo bucketingInfo) {
- this.rowCounter = rowCounter;
- this.bucketingInfo = bucketingInfo;
- }
-
- @Override public void initialize(SortParameters sortParameters) {
- this.sortParameters = sortParameters;
- int buffer = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
- sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
- if (sortBufferSize < 100) {
- sortBufferSize = 100;
- }
- }
-
- @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
- throws CarbonDataLoadingException {
- SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
- intermediateFileMergers =
- new SortIntermediateFileMerger[sortDataRows.length];
- try {
- for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
- SortParameters parameters = sortParameters.getCopy();
- parameters.setPartitionID(i + "");
- setTempLocation(parameters);
- parameters.setBufferSize(sortBufferSize);
- intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
- sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
- sortDataRows[i].initialize();
- }
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
- this.threadStatusObserver = new ThreadStatusObserver(executorService);
- final int batchSize = CarbonProperties.getInstance().getBatchSize();
- try {
- for (int i = 0; i < iterators.length; i++) {
- executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
- this.threadStatusObserver));
- }
- executorService.shutdown();
- executorService.awaitTermination(2, TimeUnit.DAYS);
- processRowToNextStep(sortDataRows, sortParameters);
- } catch (Exception e) {
- checkError();
- throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
- }
- checkError();
- try {
- for (int i = 0; i < intermediateFileMergers.length; i++) {
- intermediateFileMergers[i].finish();
- }
- } catch (CarbonDataWriterException e) {
- throw new CarbonDataLoadingException(e);
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
-
- Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
- for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
- batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
- }
-
- return batchIterator;
- }
-
- private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), bucketId,
- sortParameters.getSegmentId() + "", false, false);
- // Set the data file location
- String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
- CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
- sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
- sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
- sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
- this.sortParameters.getNoDictionarySortColumn());
- }
-
- @Override public void close() {
- for (int i = 0; i < intermediateFileMergers.length; i++) {
- intermediateFileMergers[i].close();
- }
- }
-
- /**
- * Below method will be used to process data to next step
- */
- private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
- throws CarbonDataLoadingException {
- if (null == sortDataRows || sortDataRows.length == 0) {
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- LOGGER.info("Number of Records was Zero");
- String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
- LOGGER.info(logMessage);
- return false;
- }
-
- try {
- for (int i = 0; i < sortDataRows.length; i++) {
- // start sorting
- sortDataRows[i].startSorting();
- }
- // check any more rows are present
- LOGGER.info("Record Processed For table: " + parameters.getTableName());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
- .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
- return false;
- } catch (CarbonSortKeyAndGroupByException e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(),
- parameters.getTableName(), parameters.getTaskNo(),
- parameters.getPartitionID(), parameters.getSegmentId(), false, false);
- String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
- CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
- parameters.setTempFileLocation(tmpLocs);
- }
-
- /**
- * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
- */
- private static class SortIteratorThread implements Runnable {
-
- private Iterator<CarbonRowBatch> iterator;
-
- private SortDataRows[] sortDataRows;
-
- private AtomicLong rowCounter;
-
- private ThreadStatusObserver threadStatusObserver;
-
- public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
- AtomicLong rowCounter, ThreadStatusObserver observer) {
- this.iterator = iterator;
- this.sortDataRows = sortDataRows;
- this.rowCounter = rowCounter;
- this.threadStatusObserver = observer;
- }
-
- @Override
- public void run() {
- try {
- while (iterator.hasNext()) {
- CarbonRowBatch batch = iterator.next();
- int i = 0;
- while (batch.hasNext()) {
- CarbonRow row = batch.next();
- if (row != null) {
- SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
- synchronized (sortDataRow) {
- sortDataRow.addRow(row.getData());
- rowCounter.getAndAdd(1);
- }
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error(e);
- this.threadStatusObserver.notifyFailed(e);
- }
- }
-
- }
-
- private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
- private String partitionId;
-
- private int batchSize;
-
- private boolean firstRow = true;
-
- public MergedDataIterator(String partitionId, int batchSize) {
- this.partitionId = partitionId;
- this.batchSize = batchSize;
- }
-
- private SingleThreadFinalSortFilesMerger finalMerger;
-
- @Override public boolean hasNext() {
- if (firstRow) {
- firstRow = false;
- finalMerger = getFinalMerger(partitionId);
- finalMerger.startFinalMerge();
- }
- return finalMerger.hasNext();
- }
-
- @Override public CarbonRowBatch next() {
- int counter = 0;
- CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
- while (finalMerger.hasNext() && counter < batchSize) {
- rowBatch.addRow(new CarbonRow(finalMerger.next()));
- counter++;
- }
- return rowBatch;
- }
- }
-}