You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/06/01 15:28:54 UTC
[1/2] carbondata git commit: resolved int, short type bug for hive ,
modified the hive example,
resolved unsafeintermediatefilemerger datattype issue
Repository: carbondata
Updated Branches:
refs/heads/master ddb80f729 -> 50627c047
resolved int,short type bug for hive ,modified the hive example,resolved unsafeintermediatefilemerger datattype issue
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb0405d6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb0405d6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb0405d6
Branch: refs/heads/master
Commit: eb0405d65fa9df3f7b1a994079b0ab3103a57484
Parents: ddb80f7
Author: anubhav100 <an...@knoldus.in>
Authored: Wed May 31 12:40:38 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Jun 1 23:27:55 2017 +0800
----------------------------------------------------------------------
.../carbondata/hive/CarbonHiveRecordReader.java | 53 ++++----
.../carbondata/hiveexample/HiveExample.scala | 122 ++++++-------------
.../merger/UnsafeIntermediateFileMerger.java | 6 +
3 files changed, 70 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb0405d6/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
index e7e342c..add4baf 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -16,7 +16,6 @@
*/
package org.apache.carbondata.hive;
-
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
@@ -41,7 +40,11 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -61,7 +64,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
private CarbonObjectInspector objInspector;
public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
- InputSplit inputSplit, JobConf jobConf) throws IOException {
+ InputSplit inputSplit, JobConf jobConf) throws IOException {
super(queryModel, readSupport);
initialize(inputSplit, jobConf);
}
@@ -78,16 +81,16 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
}
List<TableBlockInfo> tableBlockInfoList = CarbonHiveInputSplit.createBlocks(splitList);
queryModel.setTableBlockInfos(tableBlockInfoList);
- readSupport.initialize(queryModel.getProjectionColumns(),
- queryModel.getAbsoluteTableIdentifier());
+ readSupport
+ .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
try {
carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
} catch (QueryExecutionException e) {
throw new IOException(e.getMessage(), e.getCause());
}
if (valueObj == null) {
- valueObj = new ArrayWritable(Writable.class,
- new Writable[queryModel.getProjectionColumns().length]);
+ valueObj =
+ new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
}
final TypeInfo rowTypeInfo;
@@ -120,8 +123,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}
- @Override
- public boolean next(Void aVoid, ArrayWritable value) throws IOException {
+ @Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
if (carbonIterator.hasNext()) {
Object obj = readSupport.readRow(carbonIterator.next());
ArrayWritable tmpValue = null;
@@ -138,11 +140,12 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
} else {
if (arrValue.length != arrCurrent.length) {
- throw new IOException("CarbonHiveInput : size of object differs. Value" +
- " size : " + arrValue.length + ", Current Object size : " + arrCurrent.length);
+ throw new IOException(
+ "CarbonHiveInput : size of object differs. Value" + " size : " + arrValue.length
+ + ", Current Object size : " + arrCurrent.length);
} else {
- throw new IOException("CarbonHiveInput can not support RecordReaders that" +
- " don't return same key & value & value is null");
+ throw new IOException("CarbonHiveInput can not support RecordReaders that"
+ + " don't return same key & value & value is null");
}
}
}
@@ -156,23 +159,19 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
return createStruct(obj, objInspector);
}
- @Override
- public Void createKey() {
+ @Override public Void createKey() {
return null;
}
- @Override
- public ArrayWritable createValue() {
+ @Override public ArrayWritable createValue() {
return valueObj;
}
- @Override
- public long getPos() throws IOException {
+ @Override public long getPos() throws IOException {
return 0;
}
- @Override
- public float getProgress() throws IOException {
+ @Override public float getProgress() throws IOException {
return 0;
}
@@ -190,7 +189,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
}
private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
- throws SerDeException {
+ throws SerDeException {
List sourceArray = inspector.getList(obj);
ObjectInspector subInspector = inspector.getListElementObjectInspector();
List array = new ArrayList();
@@ -208,7 +207,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(),
(Writable[]) array.toArray(new Writable[array.size()]));
- return new ArrayWritable(Writable.class, new Writable[]{subArray});
+ return new ArrayWritable(Writable.class, new Writable[] { subArray });
}
return null;
}
@@ -224,11 +223,11 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
case DOUBLE:
return new DoubleWritable((double) obj);
case INT:
- return new IntWritable(((Long) obj).intValue());
+ return new IntWritable((int) obj);
case LONG:
return new LongWritable((long) obj);
case SHORT:
- return new ShortWritable(((Long) obj).shortValue());
+ return new ShortWritable((Short) obj);
case DATE:
return new DateWritable(new Date(Long.parseLong(String.valueOf(obj.toString()))));
case TIMESTAMP:
@@ -236,8 +235,8 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
case STRING:
return new Text(obj.toString());
case DECIMAL:
- return new HiveDecimalWritable(HiveDecimal.create(
- ((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
+ return new HiveDecimalWritable(
+ HiveDecimal.create(((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
}
throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb0405d6/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
index 6d19049..9c1d51e 100644
--- a/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
+++ b/integration/hive/src/main/scala/org/apache/carbondata/hiveexample/HiveExample.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.hiveexample
import java.io.File
-import java.sql.{DriverManager, ResultSet, SQLException, Statement}
+import java.sql.{DriverManager, ResultSet, Statement}
import org.apache.spark.sql.SparkSession
@@ -29,11 +29,6 @@ object HiveExample {
private val driverName: String = "org.apache.hive.jdbc.HiveDriver"
- /**
- * @param args
- * @throws SQLException
- */
- @throws[SQLException]
def main(args: Array[String]) {
val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
@@ -41,46 +36,38 @@ object HiveExample {
val warehouse = s"$rootPath/integration/hive/target/warehouse"
val metaStore_Db = s"$rootPath/integration/hive/target/carbon_metaStore_db"
val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ var resultId = ""
+ var resultName = ""
+ var resultSalary = ""
- import org.apache.spark.sql.CarbonSession._
- System.setProperty("hadoop.home.dir", "/")
+ import org.apache.spark.sql.CarbonSession._
- val carbon = SparkSession
+ val carbonSession = SparkSession
.builder()
.master("local")
.appName("HiveExample")
- .config("carbon.sql.warehouse.dir", warehouse).enableHiveSupport()
+ .config("carbonSession.sql.warehouse.dir", warehouse).enableHiveSupport()
.getOrCreateCarbonSession(
store, metaStore_Db)
- val carbonHadoopJarPath = s"$rootPath/assembly/target/scala-2.11/carbondata_2.11-1.1" +
- ".0-incubating-SNAPSHOT-shade-hadoop2.7.2.jar"
-
- val carbon_DefaultHadoopVersion_JarPath =
- s"$rootPath/assembly/target/scala-2.11/carbondata_2.11-1.1" +
- ".0-incubating-SNAPSHOT-shade-hadoop2.2.0.jar"
-
- val hiveJarPath = s"$rootPath/integration/hive/target/carbondata-hive-1.1" +
- ".0-incubating-SNAPSHOT.jar"
-
- carbon.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
+ carbonSession.sql("""DROP TABLE IF EXISTS HIVE_CARBON_EXAMPLE""".stripMargin)
- carbon
+ carbonSession
.sql(
"""CREATE TABLE HIVE_CARBON_EXAMPLE (ID int,NAME string,SALARY double) STORED BY
|'CARBONDATA' """
.stripMargin)
- carbon.sql(
+ carbonSession.sql(
s"""
LOAD DATA LOCAL INPATH '$rootPath/integration/hive/src/main/resources/data.csv' INTO
TABLE
HIVE_CARBON_EXAMPLE
""")
- carbon.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
+ carbonSession.sql("SELECT * FROM HIVE_CARBON_EXAMPLE").show()
- carbon.stop()
+ carbonSession.stop()
try {
Class.forName(driverName)
@@ -93,84 +80,49 @@ object HiveExample {
val hiveEmbeddedServer2 = new HiveEmbeddedServer2()
hiveEmbeddedServer2.start()
val port = hiveEmbeddedServer2.getFreePort
- val con = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
- val stmt: Statement = con.createStatement
+ val connection = DriverManager.getConnection(s"jdbc:hive2://localhost:$port/default", "", "")
+ val statement: Statement = connection.createStatement
logger.info(s"============HIVE CLI IS STARTED ON PORT $port ==============")
- try {
- stmt
- .execute(s"ADD JAR $carbonHadoopJarPath")
- }
- catch {
- case exception: Exception =>
- logger.warn(s"Jar Not Found $carbonHadoopJarPath" + "Looking For hadoop 2.2.0 version jar")
- try {
- stmt
- .execute(s"ADD JAR $carbon_DefaultHadoopVersion_JarPath")
- }
- catch {
- case exception: Exception => logger
- .error("Exception Occurs:Neither One of Jar is Found" +
- s"$carbon_DefaultHadoopVersion_JarPath,$carbonHadoopJarPath" +
- "Atleast One Should Be Build")
- hiveEmbeddedServer2.stop()
- System.exit(0)
- }
- }
- try {
- stmt
- .execute(s"ADD JAR $hiveJarPath")
- }
- catch {
- case exception: Exception => logger.error(s"Exception Occurs:Jar Not Found $hiveJarPath")
- hiveEmbeddedServer2.stop()
- System.exit(0)
-
- }
- stmt.execute("set hive.mapred.supports.subdirectories=true")
- stmt.execute("set mapreduce.input.fileinputformat.input.dir.recursive=true")
-
-
- stmt.execute("CREATE TABLE IF NOT EXISTS " + "HIVE_CARBON_EXAMPLE " +
- " (ID int, NAME string,SALARY double)")
- stmt
+ statement.execute("CREATE TABLE IF NOT EXISTS " + "HIVE_CARBON_EXAMPLE " +
+ " (ID int, NAME string,SALARY double)")
+ statement
.execute(
"ALTER TABLE HIVE_CARBON_EXAMPLE SET FILEFORMAT INPUTFORMAT \"org.apache.carbondata." +
"hive.MapredCarbonInputFormat\"OUTPUTFORMAT \"org.apache.carbondata.hive." +
"MapredCarbonOutputFormat\"SERDE \"org.apache.carbondata.hive." +
"CarbonHiveSerDe\" ")
- stmt
+ statement
.execute(
"ALTER TABLE HIVE_CARBON_EXAMPLE SET LOCATION " +
s"'file:///$store/default/hive_carbon_example' ")
-
val sql = "SELECT * FROM HIVE_CARBON_EXAMPLE"
- val res: ResultSet = stmt.executeQuery(sql)
+ val resultSet: ResultSet = statement.executeQuery(sql)
var rowsFetched = 0
- while (res.next) {
+ while (resultSet.next) {
if (rowsFetched == 0) {
println("+---+" + "+-------+" + "+--------------+")
println("| ID|" + "| NAME |" + "| SALARY |")
println("+---+" + "+-------+" + "+--------------+")
- val resultId = res.getString("id")
- val resultName = res.getString("name")
- val resultSalary = res.getString("salary")
+ resultId = resultSet.getString("id")
+ resultName = resultSet.getString("name")
+ resultSalary = resultSet.getString("salary")
println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary |")
println("+---+" + "+-------+" + "+--------------+")
}
else {
- val resultId = res.getString("ID")
- val resultName = res.getString("NAME")
- val resultSalary = res.getString("SALARY")
+ resultId = resultSet.getString("ID")
+ resultName = resultSet.getString("NAME")
+ resultSalary = resultSet.getString("SALARY")
println(s"| $resultId |" + s"| $resultName |" + s"| $resultSalary |")
println("+---+" + "+-------+" + "+--------------+")
@@ -184,7 +136,7 @@ object HiveExample {
// fetching the separate columns
var individualColRowsFetched = 0
- val resultIndividualCol = stmt.executeQuery("SELECT NAME FROM HIVE_CARBON_EXAMPLE")
+ val resultIndividualCol = statement.executeQuery("SELECT NAME FROM HIVE_CARBON_EXAMPLE")
while (resultIndividualCol.next) {
if (individualColRowsFetched == 0) {
@@ -193,13 +145,13 @@ object HiveExample {
println("+---++---------+")
- val resultName = resultIndividualCol.getString("name")
+ resultName = resultIndividualCol.getString("name")
println(s"| $resultName |")
println("+---+" + "+---------+")
}
else {
- val resultName = resultIndividualCol.getString("NAME")
+ resultName = resultIndividualCol.getString("NAME")
println(s"| $resultName |")
println("+---+" + "+---------+")
@@ -211,8 +163,10 @@ object HiveExample {
logger.info("Fetching the Out Of Order Columns ")
- val resultOutOfOrderCol = stmt.executeQuery("SELECT SALARY,ID,NAME FROM HIVE_CARBON_EXAMPLE")
+ val resultOutOfOrderCol = statement
+ .executeQuery("SELECT SALARY,ID,NAME FROM HIVE_CARBON_EXAMPLE")
var outOfOrderColFetched = 0
+
while (resultOutOfOrderCol.next()) {
if (outOfOrderColFetched == 0) {
println("+---+" + "+-------+" + "+--------------+")
@@ -220,17 +174,17 @@ object HiveExample {
println("+---+" + "+-------+" + "+--------------+")
- val resultId = resultOutOfOrderCol.getString("id")
- val resultName = resultOutOfOrderCol.getString("name")
- val resultSalary = resultOutOfOrderCol.getString("salary")
+ resultId = resultOutOfOrderCol.getString("id")
+ resultName = resultOutOfOrderCol.getString("name")
+ resultSalary = resultOutOfOrderCol.getString("salary")
println(s"| $resultSalary |" + s"| $resultId |" + s"| $resultName |")
println("+---+" + "+-------+" + "+--------------+")
}
else {
- val resultId = resultOutOfOrderCol.getString("ID")
- val resultName = resultOutOfOrderCol.getString("NAME")
- val resultSalary = resultOutOfOrderCol.getString("SALARY")
+ resultId = resultOutOfOrderCol.getString("ID")
+ resultName = resultOutOfOrderCol.getString("NAME")
+ resultSalary = resultOutOfOrderCol.getString("SALARY")
println(s"| $resultSalary |" + s"| $resultId |" + s"| $resultName |")
println("+---+" + "+-------+" + "+--------------+")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb0405d6/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 90c3b69..c67e093 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -312,7 +312,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
if (null != value) {
switch (type[mesCount]) {
case SHORT:
+ rowData.putShort(size, (Short) value);
+ size += 2;
+ break;
case INT:
+ rowData.putInt(size, (Integer) value);
+ size += 4;
+ break;
case LONG:
rowData.putLong(size, (Long) value);
size += 8;
[2/2] carbondata git commit: resolved int, short type bug for hive ,
modified the hive example This closes #979
Posted by ch...@apache.org.
resolved int,short type bug for hive ,modified the hive example This closes #979
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/50627c04
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/50627c04
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/50627c04
Branch: refs/heads/master
Commit: 50627c047229976a48465a9e26aad94b4dabd2a3
Parents: ddb80f7 eb0405d
Author: chenliang613 <ch...@apache.org>
Authored: Thu Jun 1 23:28:35 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Jun 1 23:28:35 2017 +0800
----------------------------------------------------------------------
.../carbondata/hive/CarbonHiveRecordReader.java | 53 ++++----
.../carbondata/hiveexample/HiveExample.scala | 122 ++++++-------------
.../merger/UnsafeIntermediateFileMerger.java | 6 +
3 files changed, 70 insertions(+), 111 deletions(-)
----------------------------------------------------------------------