You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 14:13:09 UTC
[carbondata] branch master updated: [CARBONDATA-3364] Support Read
from Hive. Queries are giving empty results from hive.
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new fcca6c5 [CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive.
fcca6c5 is described below
commit fcca6c5b661ec02adfa17622e980a0c396bd84c2
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Mon Apr 29 18:52:57 2019 +0530
[CARBONDATA-3364] Support Read from Hive. Queries are giving empty results from hive.
This closes #3192
---
.../apache/carbondata/examples/HiveExample.scala | 99 +++++++++++++---------
.../apache/carbondata/examplesCI/RunExamples.scala | 3 +-
integration/hive/pom.xml | 9 +-
.../carbondata/hive/CarbonHiveInputSplit.java | 8 +-
.../apache/carbondata/hive/CarbonHiveSerDe.java | 2 +-
.../carbondata/hive/MapredCarbonInputFormat.java | 20 ++---
.../carbondata/hive/MapredCarbonOutputFormat.java | 12 ++-
.../{ => test}/server/HiveEmbeddedServer2.java | 20 ++---
integration/spark-common-test/pom.xml | 6 ++
.../TestCreateHiveTableWithCarbonDS.scala | 4 +-
integration/spark-common/pom.xml | 5 ++
.../apache/spark/util/CarbonReflectionUtils.scala | 17 ++--
.../spark/util/DictionaryLRUCacheTestCase.scala | 1 +
pom.xml | 1 +
14 files changed, 123 insertions(+), 84 deletions(-)
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
index b50e763..c043076 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala
@@ -19,33 +19,36 @@ package org.apache.carbondata.examples
import java.io.File
import java.sql.{DriverManager, ResultSet, Statement}
-import org.apache.spark.sql.SparkSession
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.examples.util.ExampleUtils
-import org.apache.carbondata.hive.server.HiveEmbeddedServer2
+import org.apache.carbondata.hive.test.server.HiveEmbeddedServer2
// scalastyle:off println
object HiveExample {
private val driverName: String = "org.apache.hive.jdbc.HiveDriver"
- def main(args: Array[String]) {
- val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
- exampleBody(carbonSession, CarbonProperties.getStorePath
- + CarbonCommonConstants.FILE_SEPARATOR
- + CarbonCommonConstants.DATABASE_DEFAULT_NAME)
- carbonSession.stop()
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ private val targetLoc = s"$rootPath/examples/spark2/target"
+ val metaStoreLoc = s"$targetLoc/metastore_db"
+ val storeLocation = s"$targetLoc/store"
+ val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def main(args: Array[String]) {
+ createCarbonTable(storeLocation)
+ readFromHive
System.exit(0)
}
- def exampleBody(carbonSession: SparkSession, store: String): Unit = {
- val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
+ def createCarbonTable(store: String): Unit = {
+
+ val carbonSession = ExampleUtils.createCarbonSession("HiveExample")
carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
@@ -56,14 +59,44 @@ object HiveExample {
| STORED BY 'carbondata'
""".stripMargin)
+ val inputPath = FileFactory
+ .getUpdatedFilePath(s"$rootPath/examples/spark2/src/main/resources/sample.csv")
+
carbonSession.sql(
s"""
- | LOAD DATA LOCAL INPATH '$rootPath/examples/spark2/src/main/resources/sample.csv'
+ | LOAD DATA LOCAL INPATH '$inputPath'
+ | INTO TABLE HIVE_CARBON_EXAMPLE
+ """.stripMargin)
+
+ carbonSession.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputPath'
| INTO TABLE HIVE_CARBON_EXAMPLE
""".stripMargin)
carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
+ carbonSession.close()
+
+ // delete the already existing lock on metastore so that new derby instance
+ // for HiveServer can run on the same metastore
+ checkAndDeleteDBLock
+
+ }
+
+ def checkAndDeleteDBLock: Unit = {
+ val dbLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/db.lck")
+ val dbexLockPath = FileFactory.getUpdatedFilePath(s"$metaStoreLoc/dbex.lck")
+ if(FileFactory.isFileExist(dbLockPath)) {
+ FileFactory.deleteFile(dbLockPath, FileFactory.getFileType(dbLockPath))
+ }
+ if(FileFactory.isFileExist(dbexLockPath)) {
+ FileFactory.deleteFile(dbexLockPath, FileFactory.getFileType(dbexLockPath))
+ }
+ }
+
+
+ def readFromHive: Unit = {
try {
Class.forName(driverName)
}
@@ -72,37 +105,19 @@ object HiveExample {
classNotFoundException.printStackTrace()
}
+ // make HDFS writable
+ val path = new Path(targetLoc)
+ val fileSys = path.getFileSystem(FileFactory.getConfiguration)
+ fileSys.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+
val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
- hiveEmbeddedServer2.start()
+ hiveEmbeddedServer2.start(targetLoc)
val port = hiveEmbeddedServer2.getFreePort
val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
val statement: Statement = connection.createStatement
logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")
- statement.execute(
- s"""
- | CREATE TABLE IF NOT EXISTS HIVE_CARBON_EXAMPLE
- | (ID int, NAME string,SALARY double)
- | ROW FORMAT SERDE 'org.apache.carbondata.hive.CarbonHiveSerDe'
- | WITH SERDEPROPERTIES ('mapreduce.input.carboninputformat.databaseName'='default',
- | 'mapreduce.input.carboninputformat.tableName'='HIVE_CARBON_EXAMPLE')
- """.stripMargin)
-
- statement.execute(
- s"""
- | ALTER TABLE HIVE_CARBON_EXAMPLE
- | SET FILEFORMAT
- | INPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonInputFormat\"
- | OUTPUTFORMAT \"org.apache.carbondata.hive.MapredCarbonOutputFormat\"
- | SERDE \"org.apache.carbondata.hive.CarbonHiveSerDe\"
- """.stripMargin)
-
- statement
- .execute(
- "ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
- s"'file:///$store/hive_carbon_example' ")
-
val resultSet: ResultSet = statement.executeQuery("SELECT * FROM HIVE_CARBON_EXAMPLE")
var rowsFetched = 0
@@ -135,7 +150,7 @@ object HiveExample {
rowsFetched = rowsFetched + 1
}
println(s"******Total Number Of Rows Fetched ****** $rowsFetched")
- assert(rowsFetched == 2)
+ assert(rowsFetched == 4)
logger.info("Fetching the Individual Columns ")
@@ -166,7 +181,7 @@ object HiveExample {
}
println(" ********** Total Rows Fetched When Quering The Individual Columns **********" +
s"$individualColRowsFetched")
- assert(individualColRowsFetched == 2)
+ assert(individualColRowsFetched == 4)
logger.info("Fetching the Out Of Order Columns ")
@@ -200,7 +215,7 @@ object HiveExample {
}
println(" ********** Total Rows Fetched When Quering The Out Of Order Columns **********" +
s"$outOfOrderColFetched")
- assert(outOfOrderColFetched == 2)
+ assert(outOfOrderColFetched == 4)
hiveEmbeddedServer2.stop()
}
diff --git a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
index c5db40b..268bf5f 100644
--- a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
+++ b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
@@ -127,6 +127,7 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
}
test("HiveExample") {
- HiveExample.exampleBody(spark, TestQueryExecutor.warehouse)
+ HiveExample.createCarbonTable(TestQueryExecutor.warehouse)
+ HiveExample.readFromHive
}
}
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
index 6df4f24..7056852 100644
--- a/integration/hive/pom.xml
+++ b/integration/hive/pom.xml
@@ -65,11 +65,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark2</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
@@ -108,6 +103,10 @@
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
index a473303..7d9656e 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -91,8 +91,10 @@ public class CarbonHiveInputSplit extends FileSplit
}
public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
- String[] locations, int numberOfBlocklets, ColumnarFormatVersion version) {
+ String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
+ BlockletDetailInfo detailInfo) {
this(segmentId, path, start, length, locations, version);
+ this.detailInfo = detailInfo;
this.numberOfBlocklets = numberOfBlocklets;
}
@@ -110,8 +112,8 @@ public class CarbonHiveInputSplit extends FileSplit
*/
public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
- Map<String, String> blockStorageIdMap) {
- this(segmentId, path, start, length, locations, numberOfBlocklets, version);
+ Map<String, String> blockStorageIdMap, BlockletDetailInfo detailInfo) {
+ this(segmentId, path, start, length, locations, numberOfBlocklets, version, detailInfo);
this.blockStorageIdMap = blockStorageIdMap;
}
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 3ca8cf1..df25e5e 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.io.Writable;
* It transparently passes the object to/from the Carbon file reader/writer.
*/
@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES })
-class CarbonHiveSerDe extends AbstractSerDe {
+public class CarbonHiveSerDe extends AbstractSerDe {
private final SerDeStats stats;
private ObjectInspector objInspector;
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 1022576..64edae2 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -19,10 +19,11 @@ package org.apache.carbondata.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.SchemaReader;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -72,7 +73,8 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
} else {
if (paths != null) {
for (String inputPath : inputPaths) {
- if (paths.startsWith(inputPath.replace("file:", ""))) {
+ inputPath = inputPath.replace("file:", "");
+ if (FileFactory.isFileExist(inputPath)) {
validInputPath = inputPath;
break;
}
@@ -101,8 +103,12 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
}
@Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+ jobConf.set(DATABASE_NAME, "_dummyDb_" + UUID.randomUUID().toString());
+ jobConf.set(TABLE_NAME, "_dummyTable_" + UUID.randomUUID().toString());
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
- List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext);
+ CarbonTableInputFormat carbonTableInputFormat = new CarbonTableInputFormat();
+ List<org.apache.hadoop.mapreduce.InputSplit> splitList =
+ carbonTableInputFormat.getSplits(jobContext);
InputSplit[] splits = new InputSplit[splitList.size()];
CarbonInputSplit split;
for (int i = 0; i < splitList.size(); i++) {
@@ -110,13 +116,7 @@ public class MapredCarbonInputFormat extends CarbonTableInputFormat<ArrayWritabl
CarbonHiveInputSplit inputSplit = new CarbonHiveInputSplit(split.getSegmentId(),
split.getPath(), split.getStart(), split.getLength(),
split.getLocations(), split.getNumberOfBlocklets(),
- split.getVersion(), split.getBlockStorageIdMap());
- BlockletDetailInfo info = new BlockletDetailInfo();
- info.setBlockSize(split.getLength());
- info.setBlockFooterOffset(split.getDetailInfo().getBlockFooterOffset());
- info.setVersionNumber(split.getVersion().number());
- info.setUseMinMaxForPruning(false);
- inputSplit.setDetailInfo(info);
+ split.getVersion(), split.getBlockStorageIdMap(), split.getDetailInfo());
splits[i] = inputSplit;
}
return splits;
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
index f0071d4..427e248 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -19,20 +19,22 @@ package org.apache.carbondata.hive;
import java.io.IOException;
import java.util.Properties;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Progressable;
/**
* TODO : To extend CarbonOutputFormat
*/
-class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T>
+public class MapredCarbonOutputFormat<T> extends CarbonTableOutputFormat
implements HiveOutputFormat<Void, T> {
@Override
@@ -41,6 +43,12 @@ class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T>
return null;
}
+ @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf)
+ throws IOException {
+ org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
+ super.checkOutputSpecs(jobContext);
+ }
+
@Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
Progressable progress) throws IOException {
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java b/integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java
similarity index 94%
rename from integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
rename to integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java
index 0b42ab9..17461b5 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/server/HiveEmbeddedServer2.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/test/server/HiveEmbeddedServer2.java
@@ -15,15 +15,13 @@
* limitations under the License.
*/
-package org.apache.carbondata.hive.server;
+package org.apache.carbondata.hive.test.server;
import java.io.File;
import java.lang.reflect.Field;
-import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -48,20 +46,22 @@ import org.apache.log4j.Logger;
* a child JVM (which Hive calls local) or external.
*/
public class HiveEmbeddedServer2 {
- private static final String SCRATCH_DIR = "/tmp/hive";
+ private String SCRATCH_DIR = "";
private static final Logger log = LogServiceFactory.getLogService(Hive.class.getName());
private HiveServer2 hiveServer;
private HiveConf config;
private int port;
- private static Random secureRandom = new SecureRandom();
- public void start() throws Exception {
+ public void start(String storePath) throws Exception {
log.info("Starting Hive Local/Embedded Server...");
+ SCRATCH_DIR = storePath;
if (hiveServer == null) {
config = configure();
hiveServer = new HiveServer2();
port = MetaStoreUtils.findFreePort();
config.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, port);
+ config.setBoolVar(ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE, true);
+ config.setBoolVar(ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
hiveServer.init(config);
hiveServer.start();
waitForStartup();
@@ -126,14 +126,12 @@ public class HiveEmbeddedServer2 {
}
}
- int random = secureRandom.nextInt();
-
- conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse" + random);
- conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db" + random);
+ conf.set("hive.metastore.warehouse.dir", scratchDir + "/warehouse");
+ conf.set("hive.metastore.metadb.dir", scratchDir + "/metastore_db");
conf.set("hive.exec.scratchdir", scratchDir);
conf.set("fs.permissions.umask-mode", "022");
conf.set("javax.jdo.option.ConnectionURL",
- "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + random + ";create=true");
+ "jdbc:derby:;databaseName=" + scratchDir + "/metastore_db" + ";create=true");
conf.set("hive.metastore.local", "true");
conf.set("hive.aux.jars.path", "");
conf.set("hive.added.jars.path", "");
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index 3fd2b03..75be897 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -109,6 +109,12 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark2</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
index 7216134..49e8e98 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hive.MapredCarbonInputFormat
class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll {
@@ -57,7 +57,7 @@ class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll {
if (SparkUtil.isSparkVersionEqualTo("2.2")) {
assertResult(table.storage.locationUri.get)(new Path(s"file:$storeLocation/source").toUri)
}
- assertResult(table.storage.inputFormat.get)(classOf[CarbonTableInputFormat[_]].getName)
+ assertResult(table.storage.inputFormat.get)(classOf[MapredCarbonInputFormat].getName)
}
}
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index 0a5f096..df683e0 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -37,6 +37,11 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hive</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-streaming</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index bdacfcd..4fc30d02 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructField
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.hadoop.api.{CarbonTableInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hive.{CarbonHiveSerDe, MapredCarbonInputFormat, MapredCarbonOutputFormat}
/**
* Reflection APIs
@@ -357,14 +357,17 @@ object CarbonReflectionUtils {
val updatedSerdeMap =
serdeMap ++ Map[String, HiveSerDe](
("org.apache.spark.sql.carbonsource", HiveSerDe(Some(
- classOf[CarbonTableInputFormat[_]].getName),
- Some(classOf[CarbonTableOutputFormat].getName))),
+ classOf[MapredCarbonInputFormat].getName),
+ Some(classOf[MapredCarbonOutputFormat[_]].getName),
+ Some(classOf[CarbonHiveSerDe].getName))),
("carbon", HiveSerDe(Some(
- classOf[CarbonTableInputFormat[_]].getName),
- Some(classOf[CarbonTableOutputFormat].getName))),
+ classOf[MapredCarbonInputFormat].getName),
+ Some(classOf[MapredCarbonOutputFormat[_]].getName),
+ Some(classOf[CarbonHiveSerDe].getName))),
("carbondata", HiveSerDe(Some(
- classOf[CarbonTableInputFormat[_]].getName),
- Some(classOf[CarbonTableOutputFormat].getName))))
+ classOf[MapredCarbonInputFormat].getName),
+ Some(classOf[MapredCarbonOutputFormat[_]].getName),
+ Some(classOf[CarbonHiveSerDe].getName))))
instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap)
case _ =>
}
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala
index 301644f..8cbae2e 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryLRUCacheTestCase.scala
@@ -78,6 +78,7 @@ class DictionaryLRUCacheTestCase extends Spark2QueryTest with BeforeAndAfterAll
path = s"$resourcesPath/restructure/data_2000.csv"
+ sql("use default")
sql("drop table if exists carbon_new1")
sql("drop table if exists carbon_new2")
sql("drop table if exists carbon_new3")
diff --git a/pom.xml b/pom.xml
index 6e3bde2..51c5e25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,7 @@
<module>hadoop</module>
<module>integration/spark-common</module>
<module>integration/spark-common-test</module>
+ <module>integration/hive</module>
<module>datamap/examples</module>
<module>store/sdk</module>
<module>assembly</module>