You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:44 UTC
[06/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
deleted file mode 100644
index 61639d3..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Partition extends Serializable {
- /**
- * unique identification for the partition in the cluster.
- */
- String getUniqueID();
-
- /**
- * File path for the raw data represented by this partition
- */
- String getFilePath();
-
- /**
- * result
- *
- * @return
- */
- List<String> getFilesPath();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
deleted file mode 100644
index bc6e54f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DataPartitionerProperties {
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataPartitionerProperties.class.getName());
-
- private static DataPartitionerProperties instance;
-
- private Properties properties;
-
- private DataPartitionerProperties() {
- properties = loadProperties();
- }
-
- public static DataPartitionerProperties getInstance() {
- if (instance == null) {
- instance = new DataPartitionerProperties();
- }
- return instance;
- }
-
- public String getValue(String key, String defaultVal) {
- return properties.getProperty(key, defaultVal);
- }
-
- public String getValue(String key) {
- return properties.getProperty(key);
- }
-
- /**
- * Read the properties from CSVFilePartitioner.properties
- */
- private Properties loadProperties() {
- Properties props = new Properties();
-
- File file = new File("DataPartitioner.properties");
- FileInputStream fis = null;
- try {
- if (file.exists()) {
- fis = new FileInputStream(file);
-
- props.load(fis);
- }
- } catch (Exception e) {
- LOGGER
- .error(e, e.getMessage());
- } finally {
- if (null != fis) {
- try {
- fis.close();
- } catch (IOException e) {
- LOGGER.error(e,
- e.getMessage());
- }
- }
- }
-
- return props;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
deleted file mode 100644
index 9bee8a2..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-/**
- * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
- */
-public class DefaultLoadBalancer {
- private Map<String, List<Partition>> nodeToPartitonMap =
- new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- private Map<Partition, String> partitonToNodeMap =
- new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
- //Per form a round robin allocation
- int nodeCount = nodes.size();
-
- int partitioner = 0;
- for (Partition partition : partitions) {
- int nodeindex = partitioner % nodeCount;
- String node = nodes.get(nodeindex);
-
- List<Partition> oldList = nodeToPartitonMap.get(node);
- if (oldList == null) {
- oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- nodeToPartitonMap.put(node, oldList);
- }
- oldList.add(partition);
-
- partitonToNodeMap.put(partition, node);
-
- partitioner++;
- }
- }
-
- public List<Partition> getPartitionsForNode(String node) {
- return nodeToPartitonMap.get(node);
- }
-
- public String getNodeForPartitions(Partition partition) {
- return partitonToNodeMap.get(partition);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
deleted file mode 100644
index bd7cc42..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.Partition;
-
-public class PartitionImpl implements Partition {
- private static final long serialVersionUID = 3020172346383028547L;
- private String uniqueID;
- private String folderPath;
-
-
- public PartitionImpl(String uniqueID, String folderPath) {
- this.uniqueID = uniqueID;
- this.folderPath = folderPath;
- }
-
- @Override public String getUniqueID() {
- return uniqueID;
- }
-
- @Override public String getFilePath() {
- return folderPath;
- }
-
- @Override public String toString() {
- return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
- }
-
- @Override public List<String> getFilesPath() {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
deleted file mode 100644
index de32b5c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.Partition;
-
-public class PartitionMultiFileImpl implements Partition {
- private static final long serialVersionUID = -4363447826181193976L;
- private String uniqueID;
- private List<String> folderPath;
-
- public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
- this.uniqueID = uniqueID;
- this.folderPath = folderPath;
- }
-
- @Override public String getUniqueID() {
- // TODO Auto-generated method stub
- return uniqueID;
- }
-
- @Override public String getFilePath() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override public List<String> getFilesPath() {
- // TODO Auto-generated method stub
- return folderPath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
deleted file mode 100644
index e05be7d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-
-public final class QueryPartitionHelper {
- private static QueryPartitionHelper instance = new QueryPartitionHelper();
- private Map<String, DataPartitioner> partitionerMap =
- new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- private Map<String, DefaultLoadBalancer> loadBalancerMap =
- new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
- private QueryPartitionHelper() {
-
- }
-
- public static QueryPartitionHelper getInstance() {
- return instance;
- }
-
- /**
- * Get partitions applicable for query based on filters applied in query
- */
- public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
- String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
-
- DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
- List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan);
- return queryPartitions;
- }
-
- public List<Partition> getAllPartitions(String databaseName, String tableName) {
- String tableUniqueName = databaseName + '_' + tableName;
-
- DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
- return dataPartitioner.getAllPartitions();
- }
-
- /**
- * Get the node name where the partition is assigned to.
- */
- public String getLocation(Partition partition, String databaseName, String tableName) {
- String tableUniqueName = databaseName + '_' + tableName;
-
- DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
- return loadBalancer.getNodeForPartitions(partition);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
deleted file mode 100644
index c9b434a..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-import org.apache.spark.sql.execution.command.Partitioner;
-
-/**
- * Sample partition.
- */
-public class SampleDataPartitionerImpl implements DataPartitioner {
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName());
- private int numberOfPartitions = 1;
-
- private int partionColumnIndex = -1;
-
- private String partitionColumn;
-
- private Partitioner partitioner;
- private List<Partition> allPartitions;
- private String baseLocation;
-
- public SampleDataPartitionerImpl() {
- }
-
- public void initialize(String basePath, String[] columns, Partitioner partitioner) {
- this.partitioner = partitioner;
- numberOfPartitions = partitioner.partitionCount();
-
- partitionColumn = partitioner.partitionColumn()[0];
- LOGGER.info("SampleDataPartitionerImpl initializing with following properties.");
- LOGGER.info("partitionCount: " + numberOfPartitions);
- LOGGER.info("partitionColumn: " + partitionColumn);
- LOGGER.info("basePath: " + basePath);
- LOGGER.info("columns: " + Arrays.toString(columns));
-
- this.baseLocation = basePath;
- allPartitions = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
- for (int i = 0; i < columns.length; i++) {
- if (columns[i].equalsIgnoreCase(partitionColumn)) {
- partionColumnIndex = i;
- break;
- }
- }
-
- for (int partionCounter = 0; partionCounter < numberOfPartitions; partionCounter++) {
- PartitionImpl partitionImpl =
- new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter);
-
- List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- includedHashes.add(partionCounter);
-
- allPartitions.add(partitionImpl);
- }
- }
-
- @Override public Partition getPartionForTuple(Object[] tuple, long rowCounter) {
- int hashCode;
- if (partionColumnIndex == -1) {
- hashCode = hashCode(rowCounter);
- } else {
- try {
- hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode());
- } catch (NumberFormatException e) {
- hashCode = hashCode(0);
- }
- }
- return allPartitions.get(hashCode);
- }
-
- /**
- *
- */
- public List<Partition> getAllPartitions() {
- return allPartitions;
- }
-
- /**
- * @see DataPartitioner#getPartitions(CarbonQueryPlan)
- */
- public List<Partition> getPartitions(CarbonQueryPlan queryPlan) {
- // TODO: this has to be redone during partitioning implmentatation
- return allPartitions;
- }
-
- /**
- * Identify the partitions applicable for the given filter
- */
- public List<Partition> getPartitions() {
- return allPartitions;
-
- // TODO: this has to be redone during partitioning implementation
- // for (Partition aPartition : allPartitions) {
- // CarbonDimensionLevelFilter partitionFilterDetails =
- // aPartition.getPartitionDetails().get(partitionColumn);
- //
- // //Check if the partition is serving any of the
- // //hash code generated for include filter of query
- // for (Object includeFilter : msisdnFilter.getIncludeFilter()) {
- // int hashCode = hashCode(((String) includeFilter).hashCode());
- // if (partitionFilterDetails.getIncludeFilter().contains(hashCode)) {
- // allowedPartitions.add(aPartition);
- // break;
- // }
- // }
- // }
-
- }
-
- private int hashCode(long key) {
- return (int) (Math.abs(key) % numberOfPartitions);
- }
-
- @Override public String[] getPartitionedColumns() {
- return new String[] { partitionColumn };
- }
-
- @Override public Partitioner getPartitioner() {
- return partitioner;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
deleted file mode 100644
index bb8fc5c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.readsupport;
-
-import java.sql.Timestamp;
-
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
-
- @Override public void initialize(CarbonColumn[] carbonColumns,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- super.initialize(carbonColumns, absoluteTableIdentifier);
- //can initialize and generate schema here.
- }
-
- @Override public Row readRow(Object[] data) {
- for (int i = 0; i < dictionaries.length; i++) {
- if (dictionaries[i] != null) {
- data[i] = DataTypeUtil
- .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
- dataTypes[i]);
- switch (dataTypes[i]) {
- case STRING:
- data[i] = UTF8String.fromString(data[i].toString());
- break;
- case TIMESTAMP:
- data[i] = new Timestamp((long) data[i] / 1000);
- break;
- case LONG:
- data[i] = data[i];
- break;
- default:
- }
- } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- //convert the long to timestamp in case of direct dictionary column
- if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
- data[i] = new Timestamp((long) data[i] / 1000);
- }
- }
- }
- return new GenericRow(data);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java b/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
deleted file mode 100644
index 3fb24e2..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- *
- */
-package org.apache.carbondata.spark.splits;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-import org.apache.hadoop.io.Writable;
-
-
-/**
- * It represents one region server as one split.
- */
-public class TableSplit implements Serializable, Writable {
- private static final long serialVersionUID = -8058151330863145575L;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(TableSplit.class.getName());
- private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
- private Partition partition;
-
- /**
- * @return the locations
- */
- public List<String> getLocations() {
- return locations;
- }
-
- /**
- * @param locations the locations to set
- */
- public void setLocations(List<String> locations) {
- this.locations = locations;
- }
-
- /**
- * @return Returns the partitions.
- */
- public Partition getPartition() {
- return partition;
- }
-
- /**
- * @param partition The partitions to set.
- */
- public void setPartition(Partition partition) {
- this.partition = partition;
- }
-
- @Override public void readFields(DataInput in) throws IOException {
-
- int sizeLoc = in.readInt();
- for (int i = 0; i < sizeLoc; i++) {
- byte[] b = new byte[in.readInt()];
- in.readFully(b);
- locations.add(new String(b, Charset.defaultCharset()));
- }
-
- byte[] buf = new byte[in.readInt()];
- in.readFully(buf);
- ByteArrayInputStream bis = new ByteArrayInputStream(buf);
- ObjectInputStream ois = new ObjectInputStream(bis);
- try {
- partition = (Partition) ois.readObject();
- } catch (ClassNotFoundException e) {
- LOGGER.error(e, e.getMessage());
- }
- ois.close();
- }
-
- @Override public void write(DataOutput out) throws IOException {
-
- int sizeLoc = locations.size();
- out.writeInt(sizeLoc);
- for (int i = 0; i < sizeLoc; i++) {
- byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
- out.writeInt(bytes.length);
- out.write(bytes);
- }
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- ObjectOutputStream obs = new ObjectOutputStream(bos);
- obs.writeObject(partition);
- obs.close();
- byte[] byteArray = bos.toByteArray();
- out.writeInt(byteArray.length);
- out.write(byteArray);
- }
-
- public String toString() {
- return partition.getUniqueID() + ' ' + locations;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
deleted file mode 100644
index d2e716f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
-import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
-import org.apache.carbondata.spark.splits.TableSplit;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * This utilty parses the Carbon query plan to actual query model object.
- */
-public final class CarbonQueryUtil {
-
- private CarbonQueryUtil() {
-
- }
-
- /**
- * It creates the one split for each region server.
- */
- public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
- CarbonQueryPlan queryPlan) throws IOException {
-
- //Just create splits depends on locations of region servers
- List<Partition> allPartitions = null;
- if (queryPlan == null) {
- allPartitions =
- QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
- } else {
- allPartitions =
- QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
- }
- TableSplit[] splits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < splits.length; i++) {
- splits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = QueryPartitionHelper.getInstance()
- .getLocation(partition, databaseName, tableName);
- locations.add(location);
- splits[i].setPartition(partition);
- splits[i].setLocations(locations);
- }
-
- return splits;
- }
-
- /**
- * It creates the one split for each region server.
- */
- public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
-
- //Just create splits depends on locations of region servers
- DefaultLoadBalancer loadBalancer = null;
- List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
- loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
- TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
- for (int i = 0; i < tblSplits.length; i++) {
- tblSplits[i] = new TableSplit();
- List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Partition partition = allPartitions.get(i);
- String location = loadBalancer.getNodeForPartitions(partition);
- locations.add(location);
- tblSplits[i].setPartition(partition);
- tblSplits[i].setLocations(locations);
- }
- return tblSplits;
- }
-
- /**
- * split sourcePath by comma
- */
- public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
- String separator) {
- if (StringUtils.isNotEmpty(sourcePath)) {
- String[] files = sourcePath.split(separator);
- for (String file : files) {
- partitionsFiles.add(file);
- }
- }
- }
-
- private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
- List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
- List<Partition> partitionList =
- new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
- Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
- partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
- partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
- for (int i = 0; i < files.size(); i++) {
- partitionFiles.get(i % 1).add(files.get(i));
- }
- return partitionList;
- }
-
- public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
- List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- if (null != details) {
- for (LoadMetadataDetails oneLoad : details) {
- if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
- String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
- slices.add(loadName);
- }
- }
- }
- return slices;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
deleted file mode 100644
index 11cf9f8..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name : Carbon
- * Module Name : CARBON Data Processor
- * Author : R00903928
- * Created Date : 15-Sep-2015
- * FileName : LoadMetadataUtil.java
- * Description : Kettle step to generate MD Key
- * Class Version : 1.0
- */
-package org.apache.carbondata.spark.util;
-
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-
-public final class LoadMetadataUtil {
- private LoadMetadataUtil() {
-
- }
-
- public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) {
- CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
-
- String metaDataLocation = table.getMetaDataFilepath();
- LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
- if (details != null && details.length != 0) {
- for (LoadMetadataDetails oneRow : details) {
- if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
- || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
- && oneRow.getVisibility().equalsIgnoreCase("true")) {
- return true;
- }
- }
- }
-
- return false;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
deleted file mode 100644
index ea97bca..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
- /**
- * Carbon column validator
- */
-class CarbonColumnValidator extends ColumnValidator {
- def validateColumns(allColumns: Seq[ColumnSchema]) {
- allColumns.foreach { columnSchema =>
- val colWithSameId = allColumns.filter { x =>
- x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
- }
- if (colWithSameId.size > 1) {
- throw new MalformedCarbonCommandException("Two column can not have same columnId")
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index a1a2ecb..c464538 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -63,7 +63,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
*/
private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
// temporary solution: write to csv file, then load the csv into carbon
- val storePath = CarbonEnv.getInstance(cc).carbonCatalog.storePath
+ val storePath = CarbonEnv.get.carbonMetastore.storePath
val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
.append("tempCSV")
.append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
deleted file mode 100644
index 3162f80..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper, CarbonAliasDecoderRelation}
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
-
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.apache.carbondata.scan.expression.conditional._
-import org.apache.carbondata.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * All filter conversions are done here.
- */
-object CarbonFilters {
-
- /**
- * Converts data sources filters to carbon filter predicates.
- */
- def createCarbonFilter(schema: StructType,
- predicate: sources.Filter): Option[CarbonExpression] = {
- val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
-
- def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
- predicate match {
-
- case sources.EqualTo(name, value) =>
- Some(new EqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.Not(sources.EqualTo(name, value)) =>
- Some(new NotEqualsExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
-
- case sources.EqualNullSafe(name, value) =>
- Some(new EqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.Not(sources.EqualNullSafe(name, value)) =>
- Some(new NotEqualsExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
-
- case sources.GreaterThan(name, value) =>
- Some(new GreaterThanExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.LessThan(name, value) =>
- Some(new LessThanExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.GreaterThanOrEqual(name, value) =>
- Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
- case sources.LessThanOrEqual(name, value) =>
- Some(new LessThanEqualToExpression(getCarbonExpression(name),
- getCarbonLiteralExpression(name, value)))
-
- case sources.In(name, values) =>
- Some(new InExpression(getCarbonExpression(name),
- new ListExpression(
- convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
- case sources.Not(sources.In(name, values)) =>
- Some(new NotInExpression(getCarbonExpression(name),
- new ListExpression(
- convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
-
- case sources.And(lhs, rhs) =>
- (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
-
- case sources.Or(lhs, rhs) =>
- for {
- lhsFilter <- createFilter(lhs)
- rhsFilter <- createFilter(rhs)
- } yield {
- new OrExpression(lhsFilter, rhsFilter)
- }
-
- case _ => None
- }
- }
-
- def getCarbonExpression(name: String) = {
- new CarbonColumnExpression(name,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
- }
-
- def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
- new CarbonLiteralExpression(value,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
- }
-
- createFilter(predicate)
- }
-
-
- // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
- // Mostly dimension filters are only pushed down since it is faster in carbon.
- def selectFilters(filters: Seq[Expression],
- attrList: java.util.HashSet[AttributeReferenceWrapper],
- aliasMap: CarbonAliasDecoderRelation): Unit = {
- def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
- expr match {
- case or@ Or(left, right) =>
-
- val leftFilter = translate(left, or = true)
- val rightFilter = translate(right, or = true)
- if (leftFilter.isDefined && rightFilter.isDefined) {
- Some( sources.Or(leftFilter.get, rightFilter.get))
- } else {
- or.collect {
- case attr: AttributeReference =>
- attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- None
- }
-
- case And(left, right) =>
- (translate(left) ++ translate(right)).reduceOption(sources.And)
-
- case EqualTo(a: Attribute, Literal(v, t)) =>
- Some(sources.EqualTo(a.name, v))
- case EqualTo(l@Literal(v, t), a: Attribute) =>
- Some(sources.EqualTo(a.name, v))
- case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
- Some(sources.EqualTo(a.name, v))
- case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
- Some(sources.EqualTo(a.name, v))
-
- case Not(EqualTo(a: Attribute, Literal(v, t))) => new
- Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), a: Attribute)) => new
- Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
- Some(sources.Not(sources.EqualTo(a.name, v)))
- case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
- Some(sources.Not(sources.EqualTo(a.name, v)))
- case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
- case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
- case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- Some(sources.Not(sources.In(a.name, hSet.toArray)))
- case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- Some(sources.In(a.name, hSet.toArray))
- case Not(In(Cast(a: Attribute, _), list))
- if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- Some(sources.Not(sources.In(a.name, hSet.toArray)))
- case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- Some(sources.In(a.name, hSet.toArray))
-
- case GreaterThan(a: Attribute, Literal(v, t)) =>
- Some(sources.GreaterThan(a.name, v))
- case GreaterThan(Literal(v, t), a: Attribute) =>
- Some(sources.LessThan(a.name, v))
- case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
- Some(sources.GreaterThan(a.name, v))
- case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
- Some(sources.LessThan(a.name, v))
-
- case LessThan(a: Attribute, Literal(v, t)) =>
- Some(sources.LessThan(a.name, v))
- case LessThan(Literal(v, t), a: Attribute) =>
- Some(sources.GreaterThan(a.name, v))
- case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
- Some(sources.LessThan(a.name, v))
- case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
- Some(sources.GreaterThan(a.name, v))
-
- case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
- case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
- Some(sources.LessThanOrEqual(a.name, v))
- case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
- case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
- Some(sources.LessThanOrEqual(a.name, v))
-
- case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
- Some(sources.LessThanOrEqual(a.name, v))
- case LessThanOrEqual(Literal(v, t), a: Attribute) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
- case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
- Some(sources.LessThanOrEqual(a.name, v))
- case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
- Some(sources.GreaterThanOrEqual(a.name, v))
-
- case others =>
- if (!or) {
- others.collect {
- case attr: AttributeReference =>
- attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
- }
- }
- None
- }
- }
- filters.flatMap(translate(_, false)).toArray
- }
-
- def processExpression(exprs: Seq[Expression],
- attributesNeedToDecode: java.util.HashSet[AttributeReference],
- unprocessedExprs: ArrayBuffer[Expression],
- carbonTable: CarbonTable): Option[CarbonExpression] = {
- def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
- expr match {
- case or@ Or(left, right) =>
- val leftFilter = transformExpression(left, true)
- val rightFilter = transformExpression(right, true)
- if (leftFilter.isDefined && rightFilter.isDefined) {
- Some(new OrExpression(leftFilter.get, rightFilter.get))
- } else {
- or.collect {
- case attr: AttributeReference => attributesNeedToDecode.add(attr)
- }
- unprocessedExprs += or
- None
- }
-
- case And(left, right) =>
- (transformExpression(left) ++ transformExpression(right)).reduceOption(new
- AndExpression(_, _))
-
- case EqualTo(a: Attribute, l@Literal(v, t)) => new
- Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case EqualTo(l@Literal(v, t), a: Attribute) => new
- Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
- Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
- case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
- Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-
- case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
- Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
- Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
- Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
- Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
- case IsNotNull(child: Attribute) =>
- Some(new NotEqualsExpression(transformExpression(child).get,
- transformExpression(Literal(null)).get, true))
- case IsNull(child: Attribute) =>
- Some(new EqualToExpression(transformExpression(child).get,
- transformExpression(Literal(null)).get, true))
- case Not(In(a: Attribute, list))
- if !list.exists(!_.isInstanceOf[Literal]) =>
- if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
- Some(new FalseExpression(transformExpression(a).get))
- } else {
- Some(new NotInExpression(transformExpression(a).get,
- new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
- }
- case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
- Some(new InExpression(transformExpression(a).get,
- new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
- case Not(In(Cast(a: Attribute, _), list))
- if !list.exists(!_.isInstanceOf[Literal]) =>
- /* if any illogical expression comes in NOT IN Filter like
- NOT IN('scala',NULL) this will be treated as false expression and will
- always return no result. */
- if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
- Some(new FalseExpression(transformExpression(a).get))
- } else {
- Some(new NotInExpression(transformExpression(a).get, new ListExpression(
- convertToJavaList(list.map(transformExpression(_).get)))))
- }
- case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
- Some(new InExpression(transformExpression(a).get,
- new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-
- case GreaterThan(a: Attribute, l@Literal(v, t)) =>
- Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
- case GreaterThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
- Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
- case GreaterThan(l@Literal(v, t), a: Attribute) =>
- Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
- case GreaterThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
- Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-
- case LessThan(a: Attribute, l@Literal(v, t)) =>
- Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
- case LessThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
- Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
- case LessThan(l@Literal(v, t), a: Attribute) =>
- Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
- case LessThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
- Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-
- case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) =>
- Some(new GreaterThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case GreaterThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
- Some(new GreaterThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) =>
- Some(new LessThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case GreaterThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
- Some(new LessThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
-
- case LessThanOrEqual(a: Attribute, l@Literal(v, t)) =>
- Some(new LessThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case LessThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
- Some(new LessThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case LessThanOrEqual(l@Literal(v, t), a: Attribute) =>
- Some(new GreaterThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
- case LessThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
- Some(new GreaterThanEqualToExpression(transformExpression(a).get,
- transformExpression(l).get))
-
- case AttributeReference(name, dataType, _, _) =>
- Some(new CarbonColumnExpression(name,
- CarbonScalaUtil.convertSparkToCarbonDataType(
- getActualCarbonDataType(name, carbonTable))))
- case Literal(name, dataType) => Some(new
- CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
- case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
- case others =>
- if (!or) {
- others.collect {
- case attr: AttributeReference => attributesNeedToDecode.add(attr)
- }
- unprocessedExprs += others
- }
- None
- }
- }
- exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
- }
- private def isNullLiteral(exp: Expression): Boolean = {
- if (null != exp
- && exp.isInstanceOf[Literal]
- && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
- || (exp.asInstanceOf[Literal].value == null)) {
- true
- } else {
- false
- }
- }
- private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
- var carbonColumn: CarbonColumn =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
- val dataType = if (carbonColumn != null) {
- carbonColumn.getDataType
- } else {
- carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
- carbonColumn.getDataType match {
- case DataType.INT => DataType.LONG
- case DataType.LONG => DataType.LONG
- case DataType.DECIMAL => DataType.DECIMAL
- case _ => DataType.DOUBLE
- }
- }
- CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
- }
-
- // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
- // not able find the classes inside scala list and gives ClassNotFoundException.
- private def convertToJavaList(
- scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
- val javaList = new java.util.ArrayList[CarbonExpression]()
- scalaList.foreach(javaList.add)
- javaList
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
deleted file mode 100644
index e8bc97e..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-/**
- * Contains all options for Spark data source
- */
-class CarbonOption(options: Map[String, String]) {
- def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
-
- def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-
- def tableName: String = options.getOrElse("tableName", "default_table")
-
- def tableId: String = options.getOrElse("tableId", "default_table_id")
-
- def partitionCount: String = options.getOrElse("partitionCount", "1")
-
- def partitionClass: String = {
- options.getOrElse("partitionClass",
- "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
- }
-
- def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean
-
- def compress: Boolean = options.getOrElse("compress", "false").toBoolean
-
- def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
deleted file mode 100644
index 7618558..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
-
-
- /**
- * Column validator
- */
-trait ColumnValidator {
- def validateColumns(columns: Seq[ColumnSchema])
-}
-/**
- * Dictionary related helper service
- */
-trait DictionaryDetailService {
- def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension],
- table: CarbonTableIdentifier, storePath: String): DictionaryDetail
-}
-
-/**
- * Dictionary related detail
- */
-case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier],
- dictFilePaths: Array[String], dictFileExists: Array[Boolean])
-
-/**
- * Factory class
- */
-object CarbonSparkFactory {
- /**
- * @return column validator
- */
- def getCarbonColumnValidator(): ColumnValidator = {
- new CarbonColumnValidator
- }
-
- /**
- * @return dictionary helper
- */
- def getDictionaryDetailService(): DictionaryDetailService = {
- new DictionaryDetailHelper
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
deleted file mode 100644
index 52457b8..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import scala.collection.mutable.HashMap
-
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.datastorage.store.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-
-class DictionaryDetailHelper extends DictionaryDetailService {
- def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension],
- table: CarbonTableIdentifier, storePath: String): DictionaryDetail = {
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table)
- val dictFilePaths = new Array[String](primDimensions.length)
- val dictFileExists = new Array[Boolean](primDimensions.length)
- val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
-
- val fileType = FileFactory.getFileType(dictfolderPath)
- // Metadata folder
- val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
- // need list all dictionary file paths with exists flag
- val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
- @Override def accept(pathname: CarbonFile): Boolean = {
- CarbonTablePath.isDictionaryFile(pathname)
- }
- })
- // 2 put dictionary file names to fileNamesMap
- val fileNamesMap = new HashMap[String, Int]
- for (i <- 0 until carbonFiles.length) {
- fileNamesMap.put(carbonFiles(i).getName, i)
- }
- // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
- primDimensions.zipWithIndex.foreach { f =>
- columnIdentifier(f._2) = f._1.getColumnIdentifier
- dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
- dictFileExists(f._2) =
- fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
- case None => false
- case Some(_) => true
- }
- }
-
- DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
deleted file mode 100644
index 254052b..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * It is just Key value class. I don't get any other alternate to make the RDD class to
- * work with my minimum knowledge in scala.
- * May be I will remove later once I gain good knowledge :)
- *
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.load.LoadMetadataDetails
-
-trait Value[V] extends Serializable {
- def getValue(value: Array[Object]): V
-}
-
-class ValueImpl extends Value[Array[Object]] {
- override def getValue(value: Array[Object]): Array[Object] = value
-}
-
-trait RawValue[V] extends Serializable {
- def getValue(value: Array[Any]): V
-}
-
-class RawValueImpl extends RawValue[Array[Any]] {
- override def getValue(value: Array[Any]): Array[Any] = value
-}
-
-trait DataLoadResult[K, V] extends Serializable {
- def getKey(key: String, value: LoadMetadataDetails): (K, V)
-}
-
-class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
- override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
- (key, value)
- }
-}
-
-
-trait PartitionResult[K, V] extends Serializable {
- def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class PartitionResultImpl extends PartitionResult[Int, Boolean] {
- override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait MergeResult[K, V] extends Serializable {
- def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class MergeResultImpl extends MergeResult[Int, Boolean] {
- override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait DeletedLoadResult[K, V] extends Serializable {
- def getKey(key: String, value: String): (K, V)
-}
-
-class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
- override def getKey(key: String, value: String): (String, String) = (key, value)
-}
-
-trait RestructureResult[K, V] extends Serializable {
- def getKey(key: Int, value: Boolean): (K, V)
-}
-
-class RestructureResultImpl extends RestructureResult[Int, Boolean] {
- override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
deleted file mode 100644
index 551fc9c..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.sql.readers
-
-/**
- * Parser for parsing lines in bulk. Use this when efficiency is desired.
- *
- * @param iter iterator over lines in the file
- * @param fieldSep the delimiter used to separate fields in a line
- * @param lineSep the delimiter used to separate lines
- * @param quote character used to quote fields
- * @param escape character used to escape the quote character
- * @param ignoreLeadingSpace ignore white space before a field
- * @param ignoreTrailingSpace ignore white space after a field
- * @param headers headers for the columns
- * @param inputBufSize size of buffer to use for parsing input, tune for performance
- * @param maxCols maximum number of columns allowed, for safety against bad inputs
- */
-class CarbonBulkCsvReader (iter: Iterator[String],
- split: Int,
- fieldSep: Char = ',',
- lineSep: String = "\n",
- quote: Char = '"',
- escape: Char = '\\',
- commentMarker: Char = '#',
- ignoreLeadingSpace: Boolean = true,
- ignoreTrailingSpace: Boolean = true,
- headers: Seq[String],
- inputBufSize: Int = 128,
- maxCols: Int = 20480)
- extends CsvReader(fieldSep,
- lineSep,
- quote,
- escape,
- commentMarker,
- ignoreLeadingSpace,
- ignoreTrailingSpace,
- headers,
- inputBufSize,
- maxCols)
- with Iterator[Array[String]] {
-
- private val reader = new CarbonStringIteratorReader(iter)
- parser.beginParsing(reader)
- private var nextRecord = parser.parseNext()
-
- /**
- * get the next parsed line.
- *
- * @return array of strings where each string is a field in the CSV record
- */
- def next: Array[String] = {
- val curRecord = nextRecord
- if(curRecord != null) {
- nextRecord = parser.parseNext()
- } else {
- throw new NoSuchElementException("next record is null")
- }
- curRecord
- }
-
- def hasNext: Boolean = nextRecord != null
-
-}
-
-/**
- * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at
- * end of each line Univocity parser requires a Reader that provides access to the data to be
- * parsed and needs the newlines to be present
- * @param iter iterator over RDD[String]
- */
-private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader {
-
- private var next: Long = 0
- private var length: Long = 0 // length of input so far
- private var start: Long = 0
- private var str: String = null // current string from iter
-
- /**
- * fetch next string from iter, if done with current one
- * pretend there is a new line at the end of every string we get from from iter
- */
- private def refill(): Unit = {
- if (length == next) {
- if (iter.hasNext) {
- str = iter.next
- start = length
- // add a space to every line except the last one to store '\n'
- if (iter.hasNext) {
- length += (str.length + 1) // allowance for newline removed by SparkContext.textFile()
- } else {
- length += str.length
- }
- } else {
- str = null
- }
- }
- }
-
- /**
- * read the next character, if at end of string pretend there is a new line
- */
- override def read(): Int = {
- refill()
- if(next >= length) {
- -1
- } else {
- val cur = next - start
- next += 1
- if (cur == str.length) '\n' else str.charAt(cur.toInt)
- }
- }
-
- /**
- * read from str into cbuf
- */
- def read(cbuf: Array[Char], off: Int, len: Int): Int = {
- refill()
- var n = 0
- if ((off < 0) || (off > cbuf.length) || (len < 0) ||
- ((off + len) > cbuf.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException()
- } else if (len == 0) {
- n = 0
- } else {
- if (next >= length) { // end of input
- n = -1
- } else {
- n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size
- // add a '\n' to every line except the last one
- if (n == length - next && iter.hasNext) {
- str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off)
- cbuf(off + n - 1) = '\n'
- } else {
- str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off)
- }
- next += n
- if (n < len) {
- val m = read(cbuf, off + n, len - n) // have more space, fetch more input from iter
- if(m != -1) n += m
- }
- }
- }
- n
- }
-
- override def skip(ns: Long): Long = {
- throw new IllegalArgumentException("Skip not implemented")
- }
-
- override def ready: Boolean = {
- refill()
- true
- }
-
- override def markSupported: Boolean = false
-
- override def mark(readAheadLimit: Int): Unit = {
- throw new IllegalArgumentException("Mark not implemented")
- }
-
- override def reset(): Unit = {
- throw new IllegalArgumentException("Mark and hence reset not implemented")
- }
-
- def close(): Unit = { }
-}