You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:44 UTC

[06/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
deleted file mode 100644
index 61639d3..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api;
-
-import java.io.Serializable;
-import java.util.List;
-
-public interface Partition extends Serializable {
-  /**
-   * unique identification for the partition in the cluster.
-   */
-  String getUniqueID();
-
-  /**
-   * File path for the raw data represented by this partition
-   */
-  String getFilePath();
-
-  /**
-   * result
-   *
-   * @return
-   */
-  List<String> getFilesPath();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
deleted file mode 100644
index bc6e54f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-
-public final class DataPartitionerProperties {
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataPartitionerProperties.class.getName());
-
-  private static DataPartitionerProperties instance;
-
-  private Properties properties;
-
-  private DataPartitionerProperties() {
-    properties = loadProperties();
-  }
-
-  public static DataPartitionerProperties getInstance() {
-    if (instance == null) {
-      instance = new DataPartitionerProperties();
-    }
-    return instance;
-  }
-
-  public String getValue(String key, String defaultVal) {
-    return properties.getProperty(key, defaultVal);
-  }
-
-  public String getValue(String key) {
-    return properties.getProperty(key);
-  }
-
-  /**
-   * Read the properties from CSVFilePartitioner.properties
-   */
-  private Properties loadProperties() {
-    Properties props = new Properties();
-
-    File file = new File("DataPartitioner.properties");
-    FileInputStream fis = null;
-    try {
-      if (file.exists()) {
-        fis = new FileInputStream(file);
-
-        props.load(fis);
-      }
-    } catch (Exception e) {
-      LOGGER
-          .error(e, e.getMessage());
-    } finally {
-      if (null != fis) {
-        try {
-          fis.close();
-        } catch (IOException e) {
-          LOGGER.error(e,
-              e.getMessage());
-        }
-      }
-    }
-
-    return props;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
deleted file mode 100644
index 9bee8a2..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-/**
- * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
- */
-public class DefaultLoadBalancer {
-  private Map<String, List<Partition>> nodeToPartitonMap =
-      new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private Map<Partition, String> partitonToNodeMap =
-      new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
-    //Per form a round robin allocation
-    int nodeCount = nodes.size();
-
-    int partitioner = 0;
-    for (Partition partition : partitions) {
-      int nodeindex = partitioner % nodeCount;
-      String node = nodes.get(nodeindex);
-
-      List<Partition> oldList = nodeToPartitonMap.get(node);
-      if (oldList == null) {
-        oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-        nodeToPartitonMap.put(node, oldList);
-      }
-      oldList.add(partition);
-
-      partitonToNodeMap.put(partition, node);
-
-      partitioner++;
-    }
-  }
-
-  public List<Partition> getPartitionsForNode(String node) {
-    return nodeToPartitonMap.get(node);
-  }
-
-  public String getNodeForPartitions(Partition partition) {
-    return partitonToNodeMap.get(partition);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
deleted file mode 100644
index bd7cc42..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.Partition;
-
-public class PartitionImpl implements Partition {
-  private static final long serialVersionUID = 3020172346383028547L;
-  private String uniqueID;
-  private String folderPath;
-
-
-  public PartitionImpl(String uniqueID, String folderPath) {
-    this.uniqueID = uniqueID;
-    this.folderPath = folderPath;
-  }
-
-  @Override public String getUniqueID() {
-    return uniqueID;
-  }
-
-  @Override public String getFilePath() {
-    return folderPath;
-  }
-
-  @Override public String toString() {
-    return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
-  }
-
-  @Override public List<String> getFilesPath() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
deleted file mode 100644
index de32b5c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.spark.partition.api.Partition;
-
-public class PartitionMultiFileImpl implements Partition {
-  private static final long serialVersionUID = -4363447826181193976L;
-  private String uniqueID;
-  private List<String> folderPath;
-
-  public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
-    this.uniqueID = uniqueID;
-    this.folderPath = folderPath;
-  }
-
-  @Override public String getUniqueID() {
-    // TODO Auto-generated method stub
-    return uniqueID;
-  }
-
-  @Override public String getFilePath() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override public List<String> getFilesPath() {
-    // TODO Auto-generated method stub
-    return folderPath;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
deleted file mode 100644
index e05be7d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-
-public final class QueryPartitionHelper {
-  private static QueryPartitionHelper instance = new QueryPartitionHelper();
-  private Map<String, DataPartitioner> partitionerMap =
-      new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  private Map<String, DefaultLoadBalancer> loadBalancerMap =
-      new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-  private QueryPartitionHelper() {
-
-  }
-
-  public static QueryPartitionHelper getInstance() {
-    return instance;
-  }
-
-  /**
-   * Get partitions applicable for query based on filters applied in query
-   */
-  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
-    String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
-
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
-    List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan);
-    return queryPartitions;
-  }
-
-  public List<Partition> getAllPartitions(String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
-
-    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
-
-    return dataPartitioner.getAllPartitions();
-  }
-
-  /**
-   * Get the node name where the partition is assigned to.
-   */
-  public String getLocation(Partition partition, String databaseName, String tableName) {
-    String tableUniqueName = databaseName + '_' + tableName;
-
-    DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
-    return loadBalancer.getNodeForPartitions(partition);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
deleted file mode 100644
index c9b434a..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.partition.api.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.DataPartitioner;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-import org.apache.spark.sql.execution.command.Partitioner;
-
-/**
- * Sample partition.
- */
-public class SampleDataPartitionerImpl implements DataPartitioner {
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName());
-  private int numberOfPartitions = 1;
-
-  private int partionColumnIndex = -1;
-
-  private String partitionColumn;
-
-  private Partitioner partitioner;
-  private List<Partition> allPartitions;
-  private String baseLocation;
-
-  public SampleDataPartitionerImpl() {
-  }
-
-  public void initialize(String basePath, String[] columns, Partitioner partitioner) {
-    this.partitioner = partitioner;
-    numberOfPartitions = partitioner.partitionCount();
-
-    partitionColumn = partitioner.partitionColumn()[0];
-    LOGGER.info("SampleDataPartitionerImpl initializing with following properties.");
-    LOGGER.info("partitionCount: " + numberOfPartitions);
-    LOGGER.info("partitionColumn: " + partitionColumn);
-    LOGGER.info("basePath: " + basePath);
-    LOGGER.info("columns: " + Arrays.toString(columns));
-
-    this.baseLocation = basePath;
-    allPartitions = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-    for (int i = 0; i < columns.length; i++) {
-      if (columns[i].equalsIgnoreCase(partitionColumn)) {
-        partionColumnIndex = i;
-        break;
-      }
-    }
-
-    for (int partionCounter = 0; partionCounter < numberOfPartitions; partionCounter++) {
-      PartitionImpl partitionImpl =
-          new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter);
-
-      List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      includedHashes.add(partionCounter);
-
-      allPartitions.add(partitionImpl);
-    }
-  }
-
-  @Override public Partition getPartionForTuple(Object[] tuple, long rowCounter) {
-    int hashCode;
-    if (partionColumnIndex == -1) {
-      hashCode = hashCode(rowCounter);
-    } else {
-      try {
-        hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode());
-      } catch (NumberFormatException e) {
-        hashCode = hashCode(0);
-      }
-    }
-    return allPartitions.get(hashCode);
-  }
-
-  /**
-   *
-   */
-  public List<Partition> getAllPartitions() {
-    return allPartitions;
-  }
-
-  /**
-   * @see DataPartitioner#getPartitions(CarbonQueryPlan)
-   */
-  public List<Partition> getPartitions(CarbonQueryPlan queryPlan) {
-    // TODO: this has to be redone during partitioning implmentatation
-    return allPartitions;
-  }
-
-  /**
-   * Identify the partitions applicable for the given filter
-   */
-  public List<Partition> getPartitions() {
-    return allPartitions;
-
-    // TODO: this has to be redone during partitioning implementation
-    //    for (Partition aPartition : allPartitions) {
-    //      CarbonDimensionLevelFilter partitionFilterDetails =
-    //          aPartition.getPartitionDetails().get(partitionColumn);
-    //
-    //      //Check if the partition is serving any of the
-    //      //hash code generated for include filter of query
-    //      for (Object includeFilter : msisdnFilter.getIncludeFilter()) {
-    //        int hashCode = hashCode(((String) includeFilter).hashCode());
-    //        if (partitionFilterDetails.getIncludeFilter().contains(hashCode)) {
-    //          allowedPartitions.add(aPartition);
-    //          break;
-    //        }
-    //      }
-    //    }
-
-  }
-
-  private int hashCode(long key) {
-    return (int) (Math.abs(key) % numberOfPartitions);
-  }
-
-  @Override public String[] getPartitionedColumns() {
-    return new String[] { partitionColumn };
-  }
-
-  @Override public Partitioner getPartitioner() {
-    return partitioner;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
deleted file mode 100644
index bb8fc5c..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.readsupport;
-
-import java.sql.Timestamp;
-
-import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
-
-  @Override public void initialize(CarbonColumn[] carbonColumns,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-    super.initialize(carbonColumns, absoluteTableIdentifier);
-    //can initialize and generate schema here.
-  }
-
-  @Override public Row readRow(Object[] data) {
-    for (int i = 0; i < dictionaries.length; i++) {
-      if (dictionaries[i] != null) {
-        data[i] = DataTypeUtil
-            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
-                dataTypes[i]);
-        switch (dataTypes[i]) {
-          case STRING:
-            data[i] = UTF8String.fromString(data[i].toString());
-            break;
-          case TIMESTAMP:
-            data[i] = new Timestamp((long) data[i] / 1000);
-            break;
-          case LONG:
-            data[i] = data[i];
-            break;
-          default:
-        }
-      } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        //convert the long to timestamp in case of direct dictionary column
-        if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
-          data[i] = new Timestamp((long) data[i] / 1000);
-        }
-      }
-    }
-    return new GenericRow(data);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java b/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
deleted file mode 100644
index 3fb24e2..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- *
- */
-package org.apache.carbondata.spark.splits;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.spark.partition.api.Partition;
-
-import org.apache.hadoop.io.Writable;
-
-
-/**
- * It represents one region server as one split.
- */
-public class TableSplit implements Serializable, Writable {
-  private static final long serialVersionUID = -8058151330863145575L;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(TableSplit.class.getName());
-  private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-  private Partition partition;
-
-  /**
-   * @return the locations
-   */
-  public List<String> getLocations() {
-    return locations;
-  }
-
-  /**
-   * @param locations the locations to set
-   */
-  public void setLocations(List<String> locations) {
-    this.locations = locations;
-  }
-
-  /**
-   * @return Returns the partitions.
-   */
-  public Partition getPartition() {
-    return partition;
-  }
-
-  /**
-   * @param partition The partitions to set.
-   */
-  public void setPartition(Partition partition) {
-    this.partition = partition;
-  }
-
-  @Override public void readFields(DataInput in) throws IOException {
-
-    int sizeLoc = in.readInt();
-    for (int i = 0; i < sizeLoc; i++) {
-      byte[] b = new byte[in.readInt()];
-      in.readFully(b);
-      locations.add(new String(b, Charset.defaultCharset()));
-    }
-
-    byte[] buf = new byte[in.readInt()];
-    in.readFully(buf);
-    ByteArrayInputStream bis = new ByteArrayInputStream(buf);
-    ObjectInputStream ois = new ObjectInputStream(bis);
-    try {
-      partition = (Partition) ois.readObject();
-    } catch (ClassNotFoundException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    ois.close();
-  }
-
-  @Override public void write(DataOutput out) throws IOException {
-
-    int sizeLoc = locations.size();
-    out.writeInt(sizeLoc);
-    for (int i = 0; i < sizeLoc; i++) {
-      byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
-      out.writeInt(bytes.length);
-      out.write(bytes);
-    }
-
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
-    ObjectOutputStream obs = new ObjectOutputStream(bos);
-    obs.writeObject(partition);
-    obs.close();
-    byte[] byteArray = bos.toByteArray();
-    out.writeInt(byteArray.length);
-    out.write(byteArray);
-  }
-
-  public String toString() {
-    return partition.getUniqueID() + ' ' + locations;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
deleted file mode 100644
index d2e716f..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.spark.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.scan.model.CarbonQueryPlan;
-import org.apache.carbondata.spark.partition.api.Partition;
-import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
-import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
-import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
-import org.apache.carbondata.spark.splits.TableSplit;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * This utilty parses the Carbon query plan to actual query model object.
- */
-public final class CarbonQueryUtil {
-
-  private CarbonQueryUtil() {
-
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
-      CarbonQueryPlan queryPlan) throws IOException {
-
-    //Just create splits depends on locations of region servers
-    List<Partition> allPartitions = null;
-    if (queryPlan == null) {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
-    } else {
-      allPartitions =
-          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
-    }
-    TableSplit[] splits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < splits.length; i++) {
-      splits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = QueryPartitionHelper.getInstance()
-          .getLocation(partition, databaseName, tableName);
-      locations.add(location);
-      splits[i].setPartition(partition);
-      splits[i].setLocations(locations);
-    }
-
-    return splits;
-  }
-
-  /**
-   * It creates the one split for each region server.
-   */
-  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
-
-    //Just create splits depends on locations of region servers
-    DefaultLoadBalancer loadBalancer = null;
-    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
-    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
-    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
-    for (int i = 0; i < tblSplits.length; i++) {
-      tblSplits[i] = new TableSplit();
-      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-      Partition partition = allPartitions.get(i);
-      String location = loadBalancer.getNodeForPartitions(partition);
-      locations.add(location);
-      tblSplits[i].setPartition(partition);
-      tblSplits[i].setLocations(locations);
-    }
-    return tblSplits;
-  }
-
-  /**
-   * split sourcePath by comma
-   */
-  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
-      String separator) {
-    if (StringUtils.isNotEmpty(sourcePath)) {
-      String[] files = sourcePath.split(separator);
-      for (String file : files) {
-        partitionsFiles.add(file);
-      }
-    }
-  }
-
-  private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
-    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
-    List<Partition> partitionList =
-        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
-
-    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
-    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
-
-    for (int i = 0; i < files.size(); i++) {
-      partitionFiles.get(i % 1).add(files.get(i));
-    }
-    return partitionList;
-  }
-
-  public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
-    List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    if (null != details) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
-          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
-          slices.add(loadName);
-        }
-      }
-    }
-    return slices;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
deleted file mode 100644
index 11cf9f8..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Project Name  : Carbon
- * Module Name   : CARBON Data Processor
- * Author    : R00903928
- * Created Date  : 15-Sep-2015
- * FileName   : LoadMetadataUtil.java
- * Description   : Kettle step to generate MD Key
- * Class Version  : 1.0
- */
-package org.apache.carbondata.spark.util;
-
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-
-public final class LoadMetadataUtil {
-  private LoadMetadataUtil() {
-
-  }
-
-  public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) {
-    CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
-        .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
-
-    String metaDataLocation = table.getMetaDataFilepath();
-    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
-    if (details != null && details.length != 0) {
-      for (LoadMetadataDetails oneRow : details) {
-        if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
-            || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
-            && oneRow.getVisibility().equalsIgnoreCase("true")) {
-          return true;
-        }
-      }
-    }
-
-    return false;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
deleted file mode 100644
index ea97bca..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
- /**
-  * Carbon column validator
-  */
-class CarbonColumnValidator extends ColumnValidator {
-  def validateColumns(allColumns: Seq[ColumnSchema]) {
-    allColumns.foreach { columnSchema =>
-      val colWithSameId = allColumns.filter { x =>
-        x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
-      }
-      if (colWithSameId.size > 1) {
-        throw new MalformedCarbonCommandException("Two column can not have same columnId")
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index a1a2ecb..c464538 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -63,7 +63,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.getInstance(cc).carbonCatalog.storePath
+    val storePath = CarbonEnv.get.carbonMetastore.storePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
deleted file mode 100644
index 3162f80..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper, CarbonAliasDecoderRelation}
-import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.StructType
-
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.apache.carbondata.scan.expression.conditional._
-import org.apache.carbondata.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * All filter conversions are done here.
- */
-object CarbonFilters {
-
-  /**
-   * Converts data sources filters to carbon filter predicates.
-   */
-  def createCarbonFilter(schema: StructType,
-      predicate: sources.Filter): Option[CarbonExpression] = {
-    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
-
-    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
-      predicate match {
-
-        case sources.EqualTo(name, value) =>
-          Some(new EqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.Not(sources.EqualTo(name, value)) =>
-          Some(new NotEqualsExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-
-        case sources.EqualNullSafe(name, value) =>
-          Some(new EqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.Not(sources.EqualNullSafe(name, value)) =>
-          Some(new NotEqualsExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-
-        case sources.GreaterThan(name, value) =>
-          Some(new GreaterThanExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.LessThan(name, value) =>
-          Some(new LessThanExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.GreaterThanOrEqual(name, value) =>
-          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-        case sources.LessThanOrEqual(name, value) =>
-          Some(new LessThanEqualToExpression(getCarbonExpression(name),
-            getCarbonLiteralExpression(name, value)))
-
-        case sources.In(name, values) =>
-          Some(new InExpression(getCarbonExpression(name),
-            new ListExpression(
-              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
-        case sources.Not(sources.In(name, values)) =>
-          Some(new NotInExpression(getCarbonExpression(name),
-            new ListExpression(
-              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
-
-        case sources.And(lhs, rhs) =>
-          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
-
-        case sources.Or(lhs, rhs) =>
-          for {
-            lhsFilter <- createFilter(lhs)
-            rhsFilter <- createFilter(rhs)
-          } yield {
-            new OrExpression(lhsFilter, rhsFilter)
-          }
-
-        case _ => None
-      }
-    }
-
-    def getCarbonExpression(name: String) = {
-      new CarbonColumnExpression(name,
-        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
-    }
-
-    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
-      new CarbonLiteralExpression(value,
-        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
-    }
-
-    createFilter(predicate)
-  }
-
-
-  // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
-  // Mostly dimension filters are only pushed down since it is faster in carbon.
-  def selectFilters(filters: Seq[Expression],
-      attrList: java.util.HashSet[AttributeReferenceWrapper],
-      aliasMap: CarbonAliasDecoderRelation): Unit = {
-    def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
-      expr match {
-        case or@ Or(left, right) =>
-
-          val leftFilter = translate(left, or = true)
-          val rightFilter = translate(right, or = true)
-          if (leftFilter.isDefined && rightFilter.isDefined) {
-            Some( sources.Or(leftFilter.get, rightFilter.get))
-          } else {
-            or.collect {
-              case attr: AttributeReference =>
-                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-            None
-          }
-
-        case And(left, right) =>
-          (translate(left) ++ translate(right)).reduceOption(sources.And)
-
-        case EqualTo(a: Attribute, Literal(v, t)) =>
-          Some(sources.EqualTo(a.name, v))
-        case EqualTo(l@Literal(v, t), a: Attribute) =>
-          Some(sources.EqualTo(a.name, v))
-        case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
-          Some(sources.EqualTo(a.name, v))
-        case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(sources.EqualTo(a.name, v))
-
-        case Not(EqualTo(a: Attribute, Literal(v, t))) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), a: Attribute)) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
-            Some(sources.Not(sources.EqualTo(a.name, v)))
-        case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
-        case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
-        case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(EmptyRow))
-          Some(sources.Not(sources.In(a.name, hSet.toArray)))
-        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(EmptyRow))
-          Some(sources.In(a.name, hSet.toArray))
-        case Not(In(Cast(a: Attribute, _), list))
-          if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(EmptyRow))
-          Some(sources.Not(sources.In(a.name, hSet.toArray)))
-        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(EmptyRow))
-          Some(sources.In(a.name, hSet.toArray))
-
-        case GreaterThan(a: Attribute, Literal(v, t)) =>
-          Some(sources.GreaterThan(a.name, v))
-        case GreaterThan(Literal(v, t), a: Attribute) =>
-          Some(sources.LessThan(a.name, v))
-        case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
-          Some(sources.GreaterThan(a.name, v))
-        case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(sources.LessThan(a.name, v))
-
-        case LessThan(a: Attribute, Literal(v, t)) =>
-          Some(sources.LessThan(a.name, v))
-        case LessThan(Literal(v, t), a: Attribute) =>
-          Some(sources.GreaterThan(a.name, v))
-        case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
-          Some(sources.LessThan(a.name, v))
-        case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(sources.GreaterThan(a.name, v))
-
-        case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
-          Some(sources.GreaterThanOrEqual(a.name, v))
-        case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
-          Some(sources.LessThanOrEqual(a.name, v))
-        case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
-          Some(sources.GreaterThanOrEqual(a.name, v))
-        case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(sources.LessThanOrEqual(a.name, v))
-
-        case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
-          Some(sources.LessThanOrEqual(a.name, v))
-        case LessThanOrEqual(Literal(v, t), a: Attribute) =>
-          Some(sources.GreaterThanOrEqual(a.name, v))
-        case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
-          Some(sources.LessThanOrEqual(a.name, v))
-        case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(sources.GreaterThanOrEqual(a.name, v))
-
-        case others =>
-          if (!or) {
-            others.collect {
-              case attr: AttributeReference =>
-                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-            }
-          }
-          None
-      }
-    }
-    filters.flatMap(translate(_, false)).toArray
-  }
-
-  def processExpression(exprs: Seq[Expression],
-      attributesNeedToDecode: java.util.HashSet[AttributeReference],
-      unprocessedExprs: ArrayBuffer[Expression],
-      carbonTable: CarbonTable): Option[CarbonExpression] = {
-    def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
-      expr match {
-        case or@ Or(left, right) =>
-          val leftFilter = transformExpression(left, true)
-          val rightFilter = transformExpression(right, true)
-          if (leftFilter.isDefined && rightFilter.isDefined) {
-            Some(new OrExpression(leftFilter.get, rightFilter.get))
-          } else {
-            or.collect {
-              case attr: AttributeReference => attributesNeedToDecode.add(attr)
-            }
-            unprocessedExprs += or
-            None
-          }
-
-        case And(left, right) =>
-          (transformExpression(left) ++ transformExpression(right)).reduceOption(new
-              AndExpression(_, _))
-
-        case EqualTo(a: Attribute, l@Literal(v, t)) => new
-            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), a: Attribute) => new
-            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
-            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
-            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
-
-        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
-            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
-        case IsNotNull(child: Attribute) =>
-            Some(new NotEqualsExpression(transformExpression(child).get,
-             transformExpression(Literal(null)).get, true))
-        case IsNull(child: Attribute) =>
-            Some(new EqualToExpression(transformExpression(child).get,
-             transformExpression(Literal(null)).get, true))
-        case Not(In(a: Attribute, list))
-          if !list.exists(!_.isInstanceOf[Literal]) =>
-          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
-            Some(new FalseExpression(transformExpression(a).get))
-          } else {
-            Some(new NotInExpression(transformExpression(a).get,
-              new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-          }
-        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
-          Some(new InExpression(transformExpression(a).get,
-            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-        case Not(In(Cast(a: Attribute, _), list))
-          if !list.exists(!_.isInstanceOf[Literal]) =>
-          /* if any illogical expression comes in NOT IN Filter like
-           NOT IN('scala',NULL) this will be treated as false expression and will
-           always return no result. */
-          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
-            Some(new FalseExpression(transformExpression(a).get))
-          } else {
-            Some(new NotInExpression(transformExpression(a).get, new ListExpression(
-              convertToJavaList(list.map(transformExpression(_).get)))))
-          }
-        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
-          Some(new InExpression(transformExpression(a).get,
-            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
-
-        case GreaterThan(a: Attribute, l@Literal(v, t)) =>
-          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case GreaterThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
-          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case GreaterThan(l@Literal(v, t), a: Attribute) =>
-          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case GreaterThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-
-        case LessThan(a: Attribute, l@Literal(v, t)) =>
-          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case LessThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
-          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case LessThan(l@Literal(v, t), a: Attribute) =>
-          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-        case LessThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
-
-        case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) =>
-          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case GreaterThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
-          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) =>
-          Some(new LessThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case GreaterThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(new LessThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-
-        case LessThanOrEqual(a: Attribute, l@Literal(v, t)) =>
-          Some(new LessThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case LessThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
-          Some(new LessThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case LessThanOrEqual(l@Literal(v, t), a: Attribute) =>
-          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-        case LessThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
-          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
-            transformExpression(l).get))
-
-        case AttributeReference(name, dataType, _, _) =>
-          Some(new CarbonColumnExpression(name,
-            CarbonScalaUtil.convertSparkToCarbonDataType(
-              getActualCarbonDataType(name, carbonTable))))
-        case Literal(name, dataType) => Some(new
-            CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
-        case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
-        case others =>
-          if (!or) {
-            others.collect {
-              case attr: AttributeReference => attributesNeedToDecode.add(attr)
-            }
-            unprocessedExprs += others
-          }
-          None
-      }
-    }
-    exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
-  }
-  private def isNullLiteral(exp: Expression): Boolean = {
-    if (null != exp
-        &&  exp.isInstanceOf[Literal]
-        && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
-        || (exp.asInstanceOf[Literal].value == null)) {
-      true
-    } else {
-      false
-    }
-  }
-  private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
-    var carbonColumn: CarbonColumn =
-      carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
-    val dataType = if (carbonColumn != null) {
-      carbonColumn.getDataType
-    } else {
-      carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
-      carbonColumn.getDataType match {
-        case DataType.INT => DataType.LONG
-        case DataType.LONG => DataType.LONG
-        case DataType.DECIMAL => DataType.DECIMAL
-        case _ => DataType.DOUBLE
-      }
-    }
-    CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
-  }
-
-  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
-  // not able find the classes inside scala list and gives ClassNotFoundException.
-  private def convertToJavaList(
-      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
-    val javaList = new java.util.ArrayList[CarbonExpression]()
-    scalaList.foreach(javaList.add)
-    javaList
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
deleted file mode 100644
index e8bc97e..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-/**
- * Contains all options for Spark data source
- */
-class CarbonOption(options: Map[String, String]) {
-  def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
-
-  def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-
-  def tableName: String = options.getOrElse("tableName", "default_table")
-
-  def tableId: String = options.getOrElse("tableId", "default_table_id")
-
-  def partitionCount: String = options.getOrElse("partitionCount", "1")
-
-  def partitionClass: String = {
-    options.getOrElse("partitionClass",
-      "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
-  }
-
-  def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean
-
-  def compress: Boolean = options.getOrElse("compress", "false").toBoolean
-
-  def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
deleted file mode 100644
index 7618558..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
-
-
- /**
-  * Column validator
-  */
-trait ColumnValidator {
-  def validateColumns(columns: Seq[ColumnSchema])
-}
-/**
- * Dictionary related helper service
- */
-trait DictionaryDetailService {
-  def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension],
-      table: CarbonTableIdentifier, storePath: String): DictionaryDetail
-}
-
-/**
- * Dictionary related detail
- */
-case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier],
-    dictFilePaths: Array[String], dictFileExists: Array[Boolean])
-
-/**
- * Factory class
- */
-object CarbonSparkFactory {
-   /**
-    * @return column validator
-    */
-  def getCarbonColumnValidator(): ColumnValidator = {
-    new CarbonColumnValidator
-  }
-
-  /**
-   * @return dictionary helper
-   */
-  def getDictionaryDetailService(): DictionaryDetailService = {
-    new DictionaryDetailHelper
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
deleted file mode 100644
index 52457b8..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import scala.collection.mutable.HashMap
-
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.datastorage.store.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-
-class DictionaryDetailHelper extends DictionaryDetailService {
-  def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension],
-      table: CarbonTableIdentifier, storePath: String): DictionaryDetail = {
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table)
-    val dictFilePaths = new Array[String](primDimensions.length)
-    val dictFileExists = new Array[Boolean](primDimensions.length)
-    val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
-
-    val fileType = FileFactory.getFileType(dictfolderPath)
-    // Metadata folder
-    val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
-    // need list all dictionary file paths with exists flag
-    val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
-      @Override def accept(pathname: CarbonFile): Boolean = {
-        CarbonTablePath.isDictionaryFile(pathname)
-      }
-    })
-    // 2 put dictionary file names to fileNamesMap
-    val fileNamesMap = new HashMap[String, Int]
-    for (i <- 0 until carbonFiles.length) {
-      fileNamesMap.put(carbonFiles(i).getName, i)
-    }
-    // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
-    primDimensions.zipWithIndex.foreach { f =>
-      columnIdentifier(f._2) = f._1.getColumnIdentifier
-      dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
-      dictFileExists(f._2) =
-        fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
-          case None => false
-          case Some(_) => true
-        }
-    }
-
-    DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
deleted file mode 100644
index 254052b..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * It is just Key value class. I don't get any other alternate to make the RDD class to
- * work with my minimum knowledge in scala.
- * May be I will remove later once I gain good knowledge :)
- *
- */
-
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.load.LoadMetadataDetails
-
-trait Value[V] extends Serializable {
-  def getValue(value: Array[Object]): V
-}
-
-class ValueImpl extends Value[Array[Object]] {
-  override def getValue(value: Array[Object]): Array[Object] = value
-}
-
-trait RawValue[V] extends Serializable {
-  def getValue(value: Array[Any]): V
-}
-
-class RawValueImpl extends RawValue[Array[Any]] {
-  override def getValue(value: Array[Any]): Array[Any] = value
-}
-
-trait DataLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: LoadMetadataDetails): (K, V)
-}
-
-class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
-  override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
-    (key, value)
-  }
-}
-
-
-trait PartitionResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class PartitionResultImpl extends PartitionResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait MergeResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-
-}
-
-class MergeResultImpl extends MergeResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}
-
-trait DeletedLoadResult[K, V] extends Serializable {
-  def getKey(key: String, value: String): (K, V)
-}
-
-class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
-  override def getKey(key: String, value: String): (String, String) = (key, value)
-}
-
-trait RestructureResult[K, V] extends Serializable {
-  def getKey(key: Int, value: Boolean): (K, V)
-}
-
-class RestructureResultImpl extends RestructureResult[Int, Boolean] {
-  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
deleted file mode 100644
index 551fc9c..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.sql.readers
-
-/**
- * Parser for parsing lines in bulk. Use this when efficiency is desired.
- *
- * @param iter iterator over lines in the file
- * @param fieldSep the delimiter used to separate fields in a line
- * @param lineSep the delimiter used to separate lines
- * @param quote character used to quote fields
- * @param escape character used to escape the quote character
- * @param ignoreLeadingSpace ignore white space before a field
- * @param ignoreTrailingSpace ignore white space after a field
- * @param headers headers for the columns
- * @param inputBufSize size of buffer to use for parsing input, tune for performance
- * @param maxCols maximum number of columns allowed, for safety against bad inputs
- */
-class CarbonBulkCsvReader (iter: Iterator[String],
-    split: Int,
-    fieldSep: Char = ',',
-    lineSep: String = "\n",
-    quote: Char = '"',
-    escape: Char = '\\',
-    commentMarker: Char = '#',
-    ignoreLeadingSpace: Boolean = true,
-    ignoreTrailingSpace: Boolean = true,
-    headers: Seq[String],
-    inputBufSize: Int = 128,
-    maxCols: Int = 20480)
-  extends CsvReader(fieldSep,
-      lineSep,
-      quote,
-      escape,
-      commentMarker,
-      ignoreLeadingSpace,
-      ignoreTrailingSpace,
-      headers,
-      inputBufSize,
-      maxCols)
-    with Iterator[Array[String]] {
-
-  private val reader = new CarbonStringIteratorReader(iter)
-  parser.beginParsing(reader)
-  private var nextRecord = parser.parseNext()
-
-  /**
-   * get the next parsed line.
-   *
-   * @return array of strings where each string is a field in the CSV record
-   */
-  def next: Array[String] = {
-    val curRecord = nextRecord
-    if(curRecord != null) {
-      nextRecord = parser.parseNext()
-    } else {
-      throw new NoSuchElementException("next record is null")
-    }
-    curRecord
-  }
-
-  def hasNext: Boolean = nextRecord != null
-
-}
-
-/**
- * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at
- * end of each line Univocity parser requires a Reader that provides access to the data to be
- * parsed and needs the newlines to be present
- * @param iter iterator over RDD[String]
- */
-private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader {
-
-  private var next: Long = 0
-  private var length: Long = 0  // length of input so far
-  private var start: Long = 0
-  private var str: String = null   // current string from iter
-
-  /**
-   * fetch next string from iter, if done with current one
-   * pretend there is a new line at the end of every string we get from from iter
-   */
-  private def refill(): Unit = {
-    if (length == next) {
-      if (iter.hasNext) {
-        str = iter.next
-        start = length
-        // add a space to every line except the last one to store '\n'
-        if (iter.hasNext) {
-          length += (str.length + 1) // allowance for newline removed by SparkContext.textFile()
-        } else {
-          length += str.length
-        }
-      } else {
-        str = null
-      }
-    }
-  }
-
-  /**
-   * read the next character, if at end of string pretend there is a new line
-   */
-  override def read(): Int = {
-    refill()
-    if(next >= length) {
-      -1
-    } else {
-      val cur = next - start
-      next += 1
-      if (cur == str.length) '\n' else str.charAt(cur.toInt)
-    }
-  }
-
-  /**
-   * read from str into cbuf
-   */
-  def read(cbuf: Array[Char], off: Int, len: Int): Int = {
-    refill()
-    var n = 0
-    if ((off < 0) || (off > cbuf.length) || (len < 0) ||
-      ((off + len) > cbuf.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException()
-    } else if (len == 0) {
-      n = 0
-    } else {
-      if (next >= length) {   // end of input
-        n = -1
-      } else {
-        n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size
-        // add a '\n' to every line except the last one
-        if (n == length - next && iter.hasNext) {
-          str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off)
-          cbuf(off + n - 1) = '\n'
-        } else {
-          str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off)
-        }
-        next += n
-        if (n < len) {
-          val m = read(cbuf, off + n, len - n)  // have more space, fetch more input from iter
-          if(m != -1) n += m
-        }
-      }
-    }
-    n
-  }
-
-  override def skip(ns: Long): Long = {
-    throw new IllegalArgumentException("Skip not implemented")
-  }
-
-  override def ready: Boolean = {
-    refill()
-    true
-  }
-
-  override def markSupported: Boolean = false
-
-  override def mark(readAheadLimit: Int): Unit = {
-    throw new IllegalArgumentException("Mark not implemented")
-  }
-
-  override def reset(): Unit = {
-    throw new IllegalArgumentException("Mark and hence reset not implemented")
-  }
-
-  def close(): Unit = { }
-}