You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 14:13:09 UTC

[carbondata] branch master updated: [CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive.

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new fcca6c5  [CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive.
fcca6c5 is described below

commit fcca6c5b661ec02adfa17622e980a0c396bd84c2
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Mon Apr 29 18:52:57 2019 +0530

    [CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive.
    
    This closes #3192
---
 .../apache/carbondata/examples/HiveExample.scala   | 99 +++++++++++++---------
 .../apache/carbondata/examplesCI/RunExamples.scala |  3 +-
 integration/hive/pom.xml                           |  9 +-
 .../carbondata/hive/CarbonHiveInputSplit.java      |  8 +-
 .../apache/carbondata/hive/CarbonHiveSerDe.java    |  2 +-
 .../carbondata/hive/MapredCarbonInputFormat.java   | 20 ++---
 .../carbondata/hive/MapredCarbonOutputFormat.java  | 12 ++-
 .../{ => test}/server/HiveEmbeddedServer2.java     | 20 ++---
 integration/spark-common-test/pom.xml              |  6 ++
 .../TestCreateHiveTableWithCarbonDS.scala          |  4 +-
 integration/spark-common/pom.xml                   |  5 ++
 .../apache/spark/util/CarbonReflectionUtils.scala  | 17 ++--
 .../spark/util/DictionaryLRUCacheTestCase.scala    |  1 +
 pom.xml                                            |  1 +
 14 files changed, 123 insertions(+), 84 deletions(-)

diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
index b50e763..c043076 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
@@ -19,33 +19,36 @@ package org.apache.carbondata.examples
 import java.io.File
 import java.sql.{DriverManager, ResultSet, Statement}
 
-import org.apache.spark.sql.SparkSession
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.hive.server.HiveEmbeddedServer2
+import org.apache.carbondata.hive.test.server.HiveEmbeddedServer2
 
 // scalastyle:off println
 object HiveExample {
 
   private val driverName: String = "org.apache.hive.jdbc.HiveDriver"
 
-  def main(args: Array[String]) {
-    val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
-    exampleBody(carbonSession, CarbonProperties.getStorePath
-      + CarbonCommonConstants.FILE_SEPARATOR
-      + CarbonCommonConstants.DATABASE_DEFAULT_NAME)
-    carbonSession.stop()
+  val rootPath = new File(this.getClass.getResource("/").getPath
+                          + "../../../..").getCanonicalPath
+  private val targetLoc = s"$rootPath/examples/spark2/target"
+  val metaStoreLoc = s"$targetLoc/metastore_db"
+  val storeLocation = s"$targetLoc/store"
+  val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
+
+  def main(args: Array[String]) {
+    createCarbonTable(storeLocation)
+    readFromHive
     System.exit(0)
   }
 
-  def exampleBody(carbonSession: SparkSession, store: String): Unit = {
-    val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val rootPath = new File(this.getClass.getResource("/").getPath
-      + "../../../..").getCanonicalPath
+  def createCarbonTable(store: String): Unit = {
+
+    val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
 
     carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
 
@@ -56,14 +59,44 @@ object HiveExample {
          | STORED BY 'carbondata'
        """.stripMargin)
 
+    val inputPath = FileFactory
+      .getUpdatedFilePath(s"$rootPath/examples/spark2/src/main/resources/sample.csv")
+
     carbonSession.sql(
       s"""
-         | LOAD DATA LOCAL INPATH '$rootPath/examples/spark2/src/main/resources/sample.csv'
+         | LOAD DATA LOCAL INPATH '$inputPath'
+         | INTO TABLE HIVE_CARBON_EXAMPLE
+       """.stripMargin)
+
+    carbonSession.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputPath'
          | INTO TABLE HIVE_CARBON_EXAMPLE
        """.stripMargin)
 
     carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
 
+    carbonSession.close()
+
+    // delete the already existing lock on metastore so that new derby instance
+    // for HiveServer can run on the same metastore
+    checkAndDeleteDBLock
+
+  }
+
+  def checkAndDeleteDBLock: Unit = {
+    val dbLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/db.lck")
+    val dbexLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/dbex.lck")
+    if(FileFactory.isFileExist(dbLockPath)) {
+      FileFactory.deleteFile(dbLockPath, FileFactory.getFileType(dbLockPath))
+    }
+    if(FileFactory.isFileExist(dbexLockPath)) {
+      FileFactory.deleteFile(dbexLockPath, FileFactory.getFileType(dbexLockPath))
+    }
+  }
+
+
+  def readFromHive: Unit = {
     try {
       Class.forName(driverName)
     }
@@ -72,37 +105,19 @@ object HiveExample {
         classNotFoundException.printStackTrace()
     }
 
+    // make HDFS writable
+    val path = new Path(targetLoc)
+    val fileSys = path.getFileSystem(FileFactory.getConfiguration)
+    fileSys.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+
     val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
-    hiveEmbeddedServer2.start()
+    hiveEmbeddedServer2.start(targetLoc)
     val port = hiveEmbeddedServer2.getFreePort
     val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
     val statement: Statement = connection.createStatement
 
     logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")
 
-    statement.execute(
-      s"""
-         | CREATE TABLE IF NOT EXISTS HIVE_CARBON_EXAMPLE
-         | (ID int, NAME string,SALARY double)
-         | ROW FORMAT SERDE 'org.apache.carbondata.hive.CarbonHiveSerDe'
-         | WITH SERDEPROPERTIES ('mapreduce.input.carboninputformat.databaseName'='default',
-         | 'mapreduce.input.carboninputformat.tableName'='HIVE_CARBON_EXAMPLE')
-       """.stripMargin)
-
-    statement.execute(
-      s"""
-         | ALTER TABLE HIVE_CARBON_EXAMPLE
-         | SET FILEFORMAT
-         | INPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonInputFormat\"
-         | OUTPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonOutputFormat\"
-         | SERDE \"org.apache.carbondata.hive.CarbonHiveSerDe\"
-       """.stripMargin)
-
-    statement
-      .execute(
-        "ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
-          s"'file:///$store/hive_carbon_example' ")
-
     val resultSet: ResultSet = statement.executeQuery("SELECT * FROM HIVE_CARBON_EXAMPLE")
 
     var rowsFetched = 0
@@ -135,7 +150,7 @@ object HiveExample {
       rowsFetched = rowsFetched + 1
     }
     println(s"******Total Number Of Rows Fetched ****** $rowsFetched")
-    assert(rowsFetched == 2)
+    assert(rowsFetched == 4)
 
     logger.info("Fetching the Individual Columns ")
 
@@ -166,7 +181,7 @@ object HiveExample {
     }
     println(" ********** Total Rows Fetched When Quering The Individual Columns **********" +
       s"$individualColRowsFetched")
-    assert(individualColRowsFetched == 2)
+    assert(individualColRowsFetched == 4)
 
     logger.info("Fetching the Out Of Order Columns ")
 
@@ -200,7 +215,7 @@ object HiveExample {
     }
     println(" ********** Total Rows Fetched When Quering The Out Of Order Columns **********" +
       s"$outOfOrderColFetched")
-    assert(outOfOrderColFetched == 2)
+    assert(outOfOrderColFetched == 4)
 
     hiveEmbeddedServer2.stop()
   }
diff --git a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
index c5db40b..268bf5f 100644
--- a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
+++ b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
@@ -127,6 +127,7 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
   }
 
   test("HiveExample") {
-    HiveExample.exampleBody(spark, TestQueryExecutor.warehouse)
+    HiveExample.createCarbonTable(TestQueryExecutor.warehouse)
+    HiveExample.readFromHive
   }
 }
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
index 6df4f24..7056852 100644
--- a/integration/hive/pom.xml
+++ b/integration/hive/pom.xml
@@ -65,11 +65,6 @@
             <scope>compile</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.carbondata</groupId>
-            <artifactId>carbondata-spark2</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-service</artifactId>
             <version>${hive.version}</version>
@@ -108,6 +103,10 @@
             <artifactId>scalatest_${scala.binary.version}</artifactId>
             <scope>test</scope>
         </dependency>
+      <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+      </dependency>
     </dependencies>
 
     <build>
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
index a473303..7d9656e 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -91,8 +91,10 @@ public class CarbonHiveInputSplit extends FileSplit
   }
 
   public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
-      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version) {
+      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
+      BlockletDetailInfo detailInfo) {
     this(segmentId, path, start, length, locations, version);
+    this.detailInfo = detailInfo;
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
@@ -110,8 +112,8 @@ public class CarbonHiveInputSplit extends FileSplit
    */
   public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
       String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
-      Map<String, String> blockStorageIdMap) {
-    this(segmentId, path, start, length, locations, numberOfBlocklets, version);
+      Map<String, String> blockStorageIdMap, BlockletDetailInfo detailInfo) {
+    this(segmentId, path, start, length, locations, numberOfBlocklets, version, detailInfo);
     this.blockStorageIdMap = blockStorageIdMap;
   }
 
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 3ca8cf1..df25e5e 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.io.Writable;
  * It transparently passes the object to/from the Carbon file reader/writer.
  */
 @SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES })
-class CarbonHiveSerDe extends AbstractSerDe {
+public class CarbonHiveSerDe extends AbstractSerDe {
   private final SerDeStats stats;
   private ObjectInspector objInspector;
 
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 1022576..64edae2 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -19,10 +19,11 @@ package org.apache.carbondata.hive;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -72,7 +73,8 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
     } else {
       if (paths != null) {
         for (String inputPath : inputPaths) {
-          if (paths.startsWith(inputPath.replace("file:", ""))) {
+          inputPath = inputPath.replace("file:", "");
+          if (FileFactory.isFileExist(inputPath)) {
             validInputPath = inputPath;
             break;
           }
@@ -101,8 +103,12 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
   }
 
   @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+    jobConf.set(DATABASE_NAME, "_dummyDb_" + UUID.randomUUID().toString());
+    jobConf.set(TABLE_NAME, "_dummyTable_" + UUID.randomUUID().toString());
     org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
-    List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext);
+    CarbonTableInputFormat carbonTableInputFormat = new CarbonTableInputFormat();
+    List<org.apache.hadoop.mapreduce.InputSplit> splitList =
+        carbonTableInputFormat.getSplits(jobContext);
     InputSplit[] splits = new InputSplit[splitList.size()];
     CarbonInputSplit split;
     for (int i = 0; i < splitList.size(); i++) {
@@ -110,13 +116,7 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
       CarbonHiveInputSplit inputSplit = new CarbonHiveInputSplit(split.getSegmentId(),
               split.getPath(), split.getStart(), split.getLength(),
               split.getLocations(), split.getNumberOfBlocklets(),
-              split.getVersion(), split.getBlockStorageIdMap());
-      BlockletDetailInfo info = new BlockletDetailInfo();
-      info.setBlockSize(split.getLength());
-      info.setBlockFooterOffset(split.getDetailInfo().getBlockFooterOffset());
-      info.setVersionNumber(split.getVersion().number());
-      info.setUseMinMaxForPruning(false);
-      inputSplit.setDetailInfo(info);
+              split.getVersion(), split.getBlockStorageIdMap(), split.getDetailInfo());
       splits[i] = inputSplit;
     }
     return splits;
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
index f0071d4..427e248 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -19,20 +19,22 @@ package org.apache.carbondata.hive;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Progressable;
 
 /**
  * TODO : To extend CarbonOutputFormat
  */
-class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T>
+public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
     implements HiveOutputFormat<Void, T> {
 
   @Override
@@ -41,6 +43,12 @@ class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T>
     return null;
   }
 
+  @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf)
+      throws IOException {
+    org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
+    super.checkOutputSpecs(jobContext);
+  }
+
   @Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
       Progressable progress) throws IOException {
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java
similarity index 94%
rename from integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
rename to integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java
index 0b42ab9..17461b5 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java
@@ -15,15 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.carbondata.hive.server;
+package org.apache.carbondata.hive.test.server;
 
 import java.io.File;
 import java.lang.reflect.Field;
-import java.security.SecureRandom;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -48,20 +46,22 @@ import org.apache.log4j.Logger;
  * a child JVM (which Hive calls local) or external.
  */
 public class HiveEmbeddedServer2 {
-  private static final String SCRATCH_DIR = "/tmp/hive";
+  private String SCRATCH_DIR = "";
   private static final Logger log = LogServiceFactory.getLogService(Hive.class.getName());
   private HiveServer2 hiveServer;
   private HiveConf config;
   private int port;
-  private static Random secureRandom = new SecureRandom();
 
-  public void start() throws Exception {
+  public void start(String storePath) throws Exception {
     log.info("Starting Hive Local/Embedded Server...");
+    SCRATCH_DIR = storePath;
     if (hiveServer == null) {
       config = configure();
       hiveServer = new HiveServer2();
       port = MetaStoreUtils.findFreePort();
       config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port);
+      config.setBoolVar(ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE, true);
+      config.setBoolVar(ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
       hiveServer.init(config);
       hiveServer.start();
       waitForStartup();
@@ -126,14 +126,12 @@ public class HiveEmbeddedServer2 {
       }
     }
 
-    int random = secureRandom.nextInt();
-
-    conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random);
-    conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random);
+    conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse");
+    conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db");
     conf.set("hive.exec.scratchdir", scratchDir);
     conf.set("fs.permissions.umask-mode", "022");
     conf.set("javax.jdo.option.ConnectionURL",
-        "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true");
+        "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + ";create=true");
     conf.set("hive.metastore.local", "true");
     conf.set("hive.aux.jars.path", "");
     conf.set("hive.added.jars.path", "");
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index 3fd2b03..75be897 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -109,6 +109,12 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-spark2</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
index 7216134..49e8e98 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hive.MapredCarbonInputFormat
 
 class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll {
 
@@ -57,7 +57,7 @@ class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll {
       if (SparkUtil.isSparkVersionEqualTo("2.2")) {
         assertResult(table.storage.locationUri.get)(new Path(s"file:$storeLocation/source").toUri)
       }
-      assertResult(table.storage.inputFormat.get)(classOf[CarbonTableInputFormat[_]].getName)
+      assertResult(table.storage.inputFormat.get)(classOf[MapredCarbonInputFormat].getName)
     }
   }
 
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 0a5f096..df683e0 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -37,6 +37,11 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-hive</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-streaming</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index bdacfcd..4fc30d02 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructField
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.hadoop.api.{CarbonTableInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hive.{CarbonHiveSerDe, MapredCarbonInputFormat, MapredCarbonOutputFormat}
 
 /**
  * Reflection APIs
@@ -357,14 +357,17 @@ object CarbonReflectionUtils {
         val updatedSerdeMap =
           serdeMap ++ Map[String, HiveSerDe](
             ("org.apache.spark.sql.carbonsource", HiveSerDe(Some(
-              classOf[CarbonTableInputFormat[_]].getName),
-              Some(classOf[CarbonTableOutputFormat].getName))),
+              classOf[MapredCarbonInputFormat].getName),
+              Some(classOf[MapredCarbonOutputFormat[_]].getName),
+              Some(classOf[CarbonHiveSerDe].getName))),
             ("carbon", HiveSerDe(Some(
-              classOf[CarbonTableInputFormat[_]].getName),
-              Some(classOf[CarbonTableOutputFormat].getName))),
+              classOf[MapredCarbonInputFormat].getName),
+              Some(classOf[MapredCarbonOutputFormat[_]].getName),
+              Some(classOf[CarbonHiveSerDe].getName))),
             ("carbondata", HiveSerDe(Some(
-              classOf[CarbonTableInputFormat[_]].getName),
-              Some(classOf[CarbonTableOutputFormat].getName))))
+              classOf[MapredCarbonInputFormat].getName),
+              Some(classOf[MapredCarbonOutputFormat[_]].getName),
+              Some(classOf[CarbonHiveSerDe].getName))))
         instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap)
       case _ =>
     }
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala
index 301644f..8cbae2e 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala
@@ -78,6 +78,7 @@ class DictionaryLRUCacheTestCase extends Spark2QueryTest with BeforeAndAfterAll
 
     path = s"$resourcesPath/restructure/data_2000.csv"
 
+    sql("use default")
     sql("drop table if exists carbon_new1")
     sql("drop table if exists carbon_new2")
     sql("drop table if exists carbon_new3")
diff --git a/pom.xml b/pom.xml
index 6e3bde2..51c5e25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,7 @@
     <module>hadoop</module>
     <module>integration/spark-common</module>
     <module>integration/spark-common-test</module>
+    <module>integration/hive</module>
     <module>datamap/examples</module>
     <module>store/sdk</module>
     <module>assembly</module>