You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:25:07 UTC
[17/39] hive git commit: HIVE-13519: Allow LlapRecordReader to
parse/output rows
HIVE-13519: Allow LlapRecordReader to parse/output rows
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc7343dd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc7343dd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc7343dd
Branch: refs/heads/master
Commit: fc7343dd12ac152267615e6ac67238ee06326452
Parents: 8f6b28a
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Apr 14 14:30:45 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Apr 14 14:30:45 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/jdbc/TestLlapInputSplit.java | 21 +-
.../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 38 ++--
jdbc/pom.xml | 5 +
.../apache/hive/jdbc/LlapBaseInputFormat.java | 135 ++++++++++++
.../src/java/org/apache/hive/jdbc/LlapDump.java | 11 +-
.../org/apache/hive/jdbc/LlapInputFormat.java | 135 ------------
.../apache/hive/jdbc/LlapRowInputFormat.java | 34 +++
llap-common/pom.xml | 21 ++
.../org/apache/hadoop/hive/llap/FieldDesc.java | 63 ++++++
.../hadoop/hive/llap/LlapRowRecordReader.java | 155 ++++++++++++++
.../java/org/apache/hadoop/hive/llap/Row.java | 166 +++++++++++++++
.../org/apache/hadoop/hive/llap/Schema.java | 76 +++++++
.../org/apache/hadoop/hive/llap/TypeDesc.java | 108 ++++++++++
.../org/apache/hadoop/hive/llap/TestRow.java | 92 +++++++++
.../hadoop/hive/llap/LlapInputFormat.java | 27 ++-
.../hadoop/hive/llap/LlapBaseRecordReader.java | 205 ++++++++++++++++++
.../apache/hadoop/hive/llap/LlapInputSplit.java | 27 +--
.../hadoop/hive/llap/LlapRecordReader.java | 206 -------------------
.../ql/udf/generic/GenericUDTFGetSplits.java | 87 +++++++-
.../hadoop/hive/llap/TestLlapOutputFormat.java | 6 +-
20 files changed, 1205 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
index 338930e..366e326 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
@@ -10,8 +10,10 @@ import java.util.HashMap;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.TypeDesc;
+
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.junit.After;
import org.junit.Before;
@@ -32,14 +34,11 @@ public class TestLlapInputSplit {
new SplitLocationInfo("location1", false),
new SplitLocationInfo("location2", false),
};
- ArrayList<FieldSchema> fields = new ArrayList<FieldSchema>();
- fields.add(new FieldSchema("col1", "string", "comment1"));
- fields.add(new FieldSchema("col2", "int", "comment2"));
- HashMap<String, String> properties = new HashMap<String, String>();
- properties.put("key1", "val1");
- Schema schema = new Schema(
- fields,
- properties);
+
+ ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+ colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING)));
+ colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
+ Schema schema = new Schema(colDescs);
org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
splitNum,
@@ -94,7 +93,7 @@ public class TestLlapInputSplit {
assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk());
}
assertArrayEquals(split1.getLocations(), split2.getLocations());
- assertEquals(split1.getSchema(), split2.getSchema());
+ assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
assertEquals(split1.getLlapUser(), split2.getLlapUser());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index 98daab4..deeac2e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -60,16 +60,16 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapRecordReader;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.apache.hive.jdbc.LlapInputFormat;
+import org.apache.hive.jdbc.LlapBaseInputFormat;
+import org.apache.hive.jdbc.LlapRowInputFormat;
import org.datanucleus.ClassLoaderResolver;
import org.datanucleus.NucleusContext;
@@ -109,8 +109,6 @@ public class TestJdbcWithMiniLlap {
conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ "/tez-site.xml"));
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/llap-daemon-site.xml"));
miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
@@ -202,10 +200,14 @@ public class TestJdbcWithMiniLlap {
String user = System.getProperty("user.name");
String pwd = user;
- LlapInputFormat inputFormat = new LlapInputFormat(url, user, pwd, query);
+ LlapRowInputFormat inputFormat = new LlapRowInputFormat();
// Get splits
JobConf job = new JobConf(conf);
+ job.set(LlapBaseInputFormat.URL_KEY, url);
+ job.set(LlapBaseInputFormat.USER_KEY, user);
+ job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+ job.set(LlapBaseInputFormat.QUERY_KEY, query);
InputSplit[] splits = inputFormat.getSplits(job, numSplits);
assertTrue(splits.length > 0);
@@ -216,10 +218,12 @@ public class TestJdbcWithMiniLlap {
for (InputSplit split : splits) {
System.out.println("Processing split " + split.getLocations());
- RecordReader<NullWritable, Text> reader = inputFormat.getRecordReader(split, job, null);
- if (reader instanceof LlapRecordReader && first) {
- Schema schema = ((LlapRecordReader)reader).getSchema();
+ int numColumns = 2;
+ RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
+ if (reader instanceof LlapRowRecordReader && first) {
+ Schema schema = ((LlapRowRecordReader) reader).getSchema();
System.out.println(""+schema);
+ assertEquals(numColumns, schema.getColumns().size());
}
if (first) {
@@ -228,9 +232,15 @@ public class TestJdbcWithMiniLlap {
first = false;
}
- Text value = reader.createValue();
- while (reader.next(NullWritable.get(), value)) {
- System.out.println(value);
+ Row row = reader.createValue();
+ while (reader.next(NullWritable.get(), row)) {
+ for (int idx = 0; idx < numColumns; idx++) {
+ if (idx > 0) {
+ System.out.print(", ");
+ }
+ System.out.print(row.getValue(idx));
+ }
+ System.out.println("");
++rowCount;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 2be8c30..c99a351 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -70,6 +70,11 @@
<artifactId>hive-service-rpc</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-llap-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- inter-project -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
new file mode 100644
index 0000000..a0ddeaa
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.DriverManager;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.base.Preconditions;
+
+public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+ private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+ private String url; // "jdbc:hive2://localhost:10000/default"
+ private String user; // "hive",
+ private String pwd; // ""
+ private String query;
+
+ public static final String URL_KEY = "llap.if.hs2.connection";
+ public static final String QUERY_KEY = "llap.if.query";
+ public static final String USER_KEY = "llap.if.user";
+ public static final String PWD_KEY = "llap.if.pwd";
+
+ public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+
+ private Connection con;
+ private Statement stmt;
+
+ public LlapBaseInputFormat(String url, String user, String pwd, String query) {
+ this.url = url;
+ this.user = user;
+ this.pwd = pwd;
+ this.query = query;
+ }
+
+ public LlapBaseInputFormat() {}
+
+
+ @Override
+ public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
+ return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ List<InputSplit> ins = new ArrayList<InputSplit>();
+
+ if (url == null) url = job.get(URL_KEY);
+ if (query == null) query = job.get(QUERY_KEY);
+ if (user == null) user = job.get(USER_KEY);
+ if (pwd == null) pwd = job.get(PWD_KEY);
+
+ if (url == null || query == null) {
+ throw new IllegalStateException();
+ }
+
+ try {
+ Class.forName(driverName);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ try {
+ con = DriverManager.getConnection(url,user,pwd);
+ stmt = con.createStatement();
+ String sql = String.format(SPLIT_QUERY, query, numSplits);
+ ResultSet res = stmt.executeQuery(sql);
+ while (res.next()) {
+ // deserialize split
+ DataInput in = new DataInputStream(res.getBinaryStream(3));
+ InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
+ is.readFields(in);
+ ins.add(new LlapInputSplit(is, res.getString(1)));
+ }
+
+ res.close();
+ stmt.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return ins.toArray(new InputSplit[ins.size()]);
+ }
+
+ public void close() {
+ try {
+ con.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
index 7ed0a0e..4c3c3ab 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
@@ -49,10 +49,9 @@ import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.llap.LlapRecordReader;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.Schema;
public class LlapDump {
@@ -98,7 +97,7 @@ public class LlapDump {
System.out.println("user: "+user);
System.out.println("query: "+query);
- LlapInputFormat format = new LlapInputFormat(url, user, pwd, query);
+ LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query);
JobConf job = new JobConf();
InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
@@ -113,8 +112,8 @@ public class LlapDump {
LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
- if (reader instanceof LlapRecordReader && first) {
- Schema schema = ((LlapRecordReader)reader).getSchema();
+ if (reader instanceof LlapBaseRecordReader && first) {
+ Schema schema = ((LlapBaseRecordReader)reader).getSchema();
System.out.println(""+schema);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
deleted file mode 100644
index 9a7c16d..0000000
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.jdbc;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import java.sql.SQLException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.sql.DriverManager;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
-
-import com.google.common.base.Preconditions;
-
-public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
- private static String driverName = "org.apache.hive.jdbc.HiveDriver";
- private String url; // "jdbc:hive2://localhost:10000/default"
- private String user; // "hive",
- private String pwd; // ""
- private String query;
-
- public final String URL_KEY = "llap.if.hs2.connection";
- public final String QUERY_KEY = "llap.if.query";
- public final String USER_KEY = "llap.if.user";
- public final String PWD_KEY = "llap.if.pwd";
-
- public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
-
- private Connection con;
- private Statement stmt;
-
- public LlapInputFormat(String url, String user, String pwd, String query) {
- this.url = url;
- this.user = user;
- this.pwd = pwd;
- this.query = query;
- }
-
- public LlapInputFormat() {}
-
-
- @Override
- public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
- LlapInputSplit llapSplit = (LlapInputSplit) split;
- return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- List<InputSplit> ins = new ArrayList<InputSplit>();
-
- if (url == null) url = job.get(URL_KEY);
- if (query == null) query = job.get(QUERY_KEY);
- if (user == null) user = job.get(USER_KEY);
- if (pwd == null) pwd = job.get(PWD_KEY);
-
- if (url == null || query == null) {
- throw new IllegalStateException();
- }
-
- try {
- Class.forName(driverName);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
-
- try {
- con = DriverManager.getConnection(url,user,pwd);
- stmt = con.createStatement();
- String sql = String.format(SPLIT_QUERY, query, numSplits);
- ResultSet res = stmt.executeQuery(sql);
- while (res.next()) {
- // deserialize split
- DataInput in = new DataInputStream(res.getBinaryStream(3));
- InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
- is.readFields(in);
- ins.add(new LlapInputSplit(is, res.getString(1)));
- }
-
- res.close();
- stmt.close();
- } catch (Exception e) {
- throw new IOException(e);
- }
- return ins.toArray(new InputSplit[ins.size()]);
- }
-
- public void close() {
- try {
- con.close();
- } catch (Exception e) {
- // ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
new file mode 100644
index 0000000..1cca66a
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
@@ -0,0 +1,34 @@
+package org.apache.hive.jdbc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
+ LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return baseInputFormat.getSplits(job, numSplits);
+ }
+
+ @Override
+ public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split;
+ LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter);
+ return new LlapRowRecordReader(job, reader.getSchema(), reader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/pom.xml
----------------------------------------------------------------------
diff --git a/llap-common/pom.xml b/llap-common/pom.xml
index 5343479..ceac83b 100644
--- a/llap-common/pom.xml
+++ b/llap-common/pom.xml
@@ -39,6 +39,11 @@
<artifactId>hive-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- inter-project -->
<dependency>
@@ -58,6 +63,22 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java
new file mode 100644
index 0000000..9621978
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+public class FieldDesc implements Writable {
+ private String name;
+ private TypeDesc typeDesc;
+
+ public FieldDesc() {
+ typeDesc = new TypeDesc();
+ }
+
+ public FieldDesc(String name, TypeDesc typeDesc) {
+ this.name = name;
+ this.typeDesc = typeDesc;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public TypeDesc getTypeDesc() {
+ return typeDesc;
+ }
+
+ @Override
+ public String toString() {
+ return getName() + ":" + getTypeDesc().toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(name);
+ typeDesc.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ name = in.readUTF();
+ typeDesc.readFields(in);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
new file mode 100644
index 0000000..4e000ff
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -0,0 +1,155 @@
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.TypeDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
+
+ Configuration conf;
+ RecordReader<NullWritable, Text> reader;
+ Schema schema;
+ SerDe serde;
+ final Text textData = new Text();
+
+ public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) {
+ this.conf = conf;
+ this.schema = schema;
+ this.reader = reader;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public Row createValue() {
+ return new Row(schema);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public boolean next(NullWritable key, Row value) throws IOException {
+ Preconditions.checkArgument(value != null);
+
+ if (serde == null) {
+ try {
+ serde = initSerDe(conf);
+ } catch (SerDeException err) {
+ throw new IOException(err);
+ }
+ }
+
+ boolean hasNext = reader.next(key, textData);
+ if (hasNext) {
+ // Deserialize Text to column values, and populate the row record
+ Object rowObj;
+ try {
+ StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
+ rowObj = serde.deserialize(textData);
+ List<? extends StructField> colFields = rowOI.getAllStructFieldRefs();
+ for (int idx = 0; idx < colFields.size(); ++idx) {
+ StructField field = colFields.get(idx);
+ Object colValue = rowOI.getStructFieldData(rowObj, field);
+ Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE,
+ "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName());
+
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector();
+ // char/varchar special cased here since the row record handles them using Text
+ switch (poi.getPrimitiveCategory()) {
+ case CHAR:
+ value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue());
+ break;
+ case VARCHAR:
+ value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue());
+ break;
+ default:
+ value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue));
+ break;
+ }
+ }
+ } catch (SerDeException err) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error deserializing row from text: " + textData);
+ }
+ throw new IOException("Error deserializing row data", err);
+ }
+ }
+
+ return hasNext;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ protected SerDe initSerDe(Configuration conf) throws SerDeException {
+ Properties props = new Properties();
+ StringBuffer columnsBuffer = new StringBuffer();
+ StringBuffer typesBuffer = new StringBuffer();
+ boolean isFirst = true;
+ for (FieldDesc colDesc : schema.getColumns()) {
+ if (!isFirst) {
+ columnsBuffer.append(',');
+ typesBuffer.append(',');
+ }
+ columnsBuffer.append(colDesc.getName());
+ typesBuffer.append(colDesc.getTypeDesc().toString());
+ isFirst = false;
+ }
+ String columns = columnsBuffer.toString();
+ String types = typesBuffer.toString();
+ props.put(serdeConstants.LIST_COLUMNS, columns);
+ props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+ SerDe serde = new LazySimpleSerDe();
+ serde.initialize(conf, props);
+
+ return serde;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java
new file mode 100644
index 0000000..a84fadc
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+
+public class Row {
+ private final Schema schema;
+ private final Writable[] colValues;
+ private final boolean[] nullIndicators;
+ private Map<String, Integer> nameToIndexMapping;
+
+ public Row(Schema schema) {
+ this.schema = schema;
+ this.colValues = new Writable[schema.getColumns().size()];
+ this.nullIndicators = new boolean[schema.getColumns().size()];
+ this.nameToIndexMapping = new HashMap<String, Integer>(schema.getColumns().size());
+
+ List<FieldDesc> colDescs = schema.getColumns();
+ for (int idx = 0; idx < colDescs.size(); ++idx) {
+ FieldDesc colDesc = colDescs.get(idx);
+ nameToIndexMapping.put(colDesc.getName(), idx);
+ colValues[idx] = createWritableForType(colDesc.getTypeDesc());
+ }
+ }
+
+ public Writable getValue(int colIndex) {
+ if (nullIndicators[colIndex]) {
+ return null;
+ }
+ return colValues[colIndex];
+ }
+
+ public Writable getValue(String colName) {
+ Integer idx = nameToIndexMapping.get(colName);
+ Preconditions.checkArgument(idx != null);
+ return getValue(idx);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ void setValue(int colIdx, Writable value) {
+ Preconditions.checkArgument(colIdx <= schema.getColumns().size());
+
+ if (value == null) {
+ nullIndicators[colIdx] = true;
+ } else {
+ nullIndicators[colIdx] = false;
+ FieldDesc colDesc = schema.getColumns().get(colIdx);
+ switch (colDesc.getTypeDesc().getType()) {
+ case BOOLEAN:
+ ((BooleanWritable) colValues[colIdx]).set(((BooleanWritable) value).get());
+ break;
+ case TINYINT:
+ ((ByteWritable) colValues[colIdx]).set(((ByteWritable) value).get());
+ break;
+ case SMALLINT:
+ ((ShortWritable) colValues[colIdx]).set(((ShortWritable) value).get());
+ break;
+ case INT:
+ ((IntWritable) colValues[colIdx]).set(((IntWritable) value).get());
+ break;
+ case BIGINT:
+ ((LongWritable) colValues[colIdx]).set(((LongWritable) value).get());
+ break;
+ case FLOAT:
+ ((FloatWritable) colValues[colIdx]).set(((FloatWritable) value).get());
+ break;
+ case DOUBLE:
+ ((DoubleWritable) colValues[colIdx]).set(((DoubleWritable) value).get());
+ break;
+ case STRING:
+ // Just handle char/varchar as Text
+ case CHAR:
+ case VARCHAR:
+ ((Text) colValues[colIdx]).set((Text) value);
+ break;
+ case DATE:
+ ((DateWritable) colValues[colIdx]).set((DateWritable) value);
+ break;
+ case TIMESTAMP:
+ ((TimestampWritable) colValues[colIdx]).set((TimestampWritable) value);
+ break;
+ case BINARY:
+ ((BytesWritable) colValues[colIdx]).set(((BytesWritable) value));
+ break;
+ case DECIMAL:
+ ((HiveDecimalWritable) colValues[colIdx]).set((HiveDecimalWritable) value);
+ break;
+ }
+ }
+ }
+
+ private Writable createWritableForType(TypeDesc typeDesc) {
+ switch (typeDesc.getType()) {
+ case BOOLEAN:
+ return new BooleanWritable();
+ case TINYINT:
+ return new ByteWritable();
+ case SMALLINT:
+ return new ShortWritable();
+ case INT:
+ return new IntWritable();
+ case BIGINT:
+ return new LongWritable();
+ case FLOAT:
+ return new FloatWritable();
+ case DOUBLE:
+ return new DoubleWritable();
+ case STRING:
+ // Just handle char/varchar as Text
+ case CHAR:
+ case VARCHAR:
+ return new Text();
+ case DATE:
+ return new DateWritable();
+ case TIMESTAMP:
+ return new TimestampWritable();
+ case BINARY:
+ return new BytesWritable();
+ case DECIMAL:
+ return new HiveDecimalWritable();
+ default:
+ throw new RuntimeException("Cannot create writable for " + typeDesc.getType());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java
new file mode 100644
index 0000000..c1bf4ea
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Writable;
+
+public class Schema implements Writable {
+
+ private final List<FieldDesc> columns;
+
+ public Schema(List<FieldDesc> columns) {
+ this.columns = columns;
+ }
+
+ public Schema() {
+ columns = new ArrayList<FieldDesc>();
+ }
+
+ public List<FieldDesc> getColumns() {
+ return columns;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ boolean first = true;
+ for (FieldDesc colDesc : getColumns()) {
+ if (!first) {
+ sb.append(",");
+ }
+ sb.append(colDesc.toString());
+ first = false;
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(columns.size());
+ for (FieldDesc column : columns) {
+ column.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numColumns = in.readInt();
+ columns.clear();
+ for (int idx = 0; idx < numColumns; ++idx) {
+ FieldDesc colDesc = new FieldDesc();
+ colDesc.readFields(in);
+ columns.add(colDesc);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java
new file mode 100644
index 0000000..dda5928
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+public class TypeDesc implements Writable {
+ public static enum Type {
+ BOOLEAN,
+ TINYINT,
+ SMALLINT,
+ INT,
+ BIGINT,
+ FLOAT,
+ DOUBLE,
+ STRING,
+ CHAR,
+ VARCHAR,
+ DATE,
+ TIMESTAMP,
+ BINARY,
+ DECIMAL,
+ }
+
+ private TypeDesc.Type type;
+ private int precision;
+ private int scale;
+
+ // For types with no type qualifiers
+ public TypeDesc(TypeDesc.Type type) {
+ this(type, 0, 0);
+ }
+
+ // For decimal types
+ public TypeDesc(TypeDesc.Type type, int precision, int scale) {
+ this.type = type;
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ // For char/varchar types
+ public TypeDesc(TypeDesc.Type type, int precision) {
+ this(type, precision, 0);
+ }
+
+ // Should be used for serialization only
+ public TypeDesc() {
+ this(TypeDesc.Type.INT, 0, 0);
+ }
+
+ public TypeDesc.Type getType() {
+ return type;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ @Override
+ public String toString() {
+ switch (type) {
+ case DECIMAL:
+ return type.name().toLowerCase() + "(" + precision + "," + scale + ")";
+ case CHAR:
+ case VARCHAR:
+ return type.name().toLowerCase() + "(" + precision + ")";
+ default:
+ return type.name().toLowerCase();
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(type.name());
+ out.writeInt(precision);
+ out.writeInt(scale);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ type = TypeDesc.Type.valueOf(in.readUTF());
+ precision = in.readInt();
+ scale = in.readInt();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java b/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java
new file mode 100644
index 0000000..d4e68f4
--- /dev/null
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestRow {
+
+ @Test
+ public void testUsage() {
+ Schema schema = createTestSchema();
+ Row row = new Row(schema);
+
+ Random rand = new Random();
+ int iterations = 100;
+ Text col0 = new Text();
+ IntWritable col1 = new IntWritable();
+ for (int idx = 0; idx < iterations; ++idx) {
+ // Set the row values
+ boolean isNullCol0 = (rand.nextDouble() <= 0.25);
+ col0.set(RandomStringUtils.random(10));
+ row.setValue(0, isNullCol0 ? null : col0);
+
+ boolean isNullCol1 = (rand.nextDouble() <= 0.25);
+ col1.set(rand.nextInt());
+ row.setValue(1, isNullCol1 ? null : col1);
+
+ // Validate the row values
+ if (isNullCol0) {
+ assertTrue(row.getValue(0) == null);
+ assertTrue(row.getValue("col0") == null);
+ } else {
+ assertTrue(row.getValue(0) != null);
+ assertTrue(col0 != row.getValue(0));
+ assertEquals(col0, row.getValue(0));
+ assertEquals(col0, row.getValue("col0"));
+ }
+
+ if (isNullCol1) {
+ assertTrue(row.getValue(1) == null);
+ assertTrue(row.getValue("col1") == null);
+ } else {
+ assertTrue(row.getValue(1) != null);
+ assertTrue(col1 != row.getValue(1));
+ assertEquals(col1, row.getValue(1));
+ assertEquals(col1, row.getValue("col1"));
+ }
+ }
+ }
+
+ private Schema createTestSchema() {
+ List<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+
+ colDescs.add(new FieldDesc("col0",
+ new TypeDesc(TypeDesc.Type.STRING)));
+
+ colDescs.add(new FieldDesc("col1",
+ new TypeDesc(TypeDesc.Type.INT)));
+
+ Schema schema = new Schema(colDescs);
+ return schema;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index aaca7d6..0930d60 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -30,7 +30,7 @@ import com.google.protobuf.ByteString;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapRecordReader.ReaderEvent;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
@@ -74,7 +74,6 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
-
public LlapInputFormat() {
}
@@ -135,7 +134,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
LOG.info("Registered id: " + id);
- LlapRecordReader recordReader = new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
umbilicalResponder.setRecordReader(recordReader);
return recordReader;
}
@@ -276,7 +275,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
}
private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
- protected LlapRecordReader recordReader = null;
+ protected LlapBaseRecordReader recordReader = null;
protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
public LlapRecordReaderTaskUmbilicalExternalResponder() {
@@ -285,7 +284,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
@Override
public void submissionFailed(String fragmentId, Throwable throwable) {
try {
- sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+ sendOrQueueEvent(ReaderEvent.errorEvent(
"Received submission failed event for fragment ID " + fragmentId));
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
@@ -301,11 +300,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
try {
switch (eventType) {
case TASK_ATTEMPT_COMPLETED_EVENT:
- sendOrQueueEvent(LlapRecordReader.ReaderEvent.doneEvent());
+ sendOrQueueEvent(ReaderEvent.doneEvent());
break;
case TASK_ATTEMPT_FAILED_EVENT:
TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
- sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+ sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
break;
case TASK_STATUS_UPDATE_EVENT:
// If we want to handle counters
@@ -323,7 +322,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
@Override
public void taskKilled(TezTaskAttemptID taskAttemptId) {
try {
- sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+ sendOrQueueEvent(ReaderEvent.errorEvent(
"Received task killed event for task ID " + taskAttemptId));
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
@@ -333,18 +332,18 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
@Override
public void heartbeatTimeout(String taskAttemptId) {
try {
- sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+ sendOrQueueEvent(ReaderEvent.errorEvent(
"Timed out waiting for heartbeat for task ID " + taskAttemptId));
} catch (Exception err) {
LOG.error("Error during heartbeat responder:", err);
}
}
- public synchronized LlapRecordReader getRecordReader() {
+ public synchronized LlapBaseRecordReader getRecordReader() {
return recordReader;
}
- public synchronized void setRecordReader(LlapRecordReader recordReader) {
+ public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
this.recordReader = recordReader;
if (recordReader == null) {
@@ -353,7 +352,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
// If any events were queued by the responder, give them to the record reader now.
while (!queuedEvents.isEmpty()) {
- LlapRecordReader.ReaderEvent readerEvent = queuedEvents.poll();
+ ReaderEvent readerEvent = queuedEvents.poll();
LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
recordReader.handleEvent(readerEvent);
}
@@ -365,8 +364,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
* since we don't want to drop these events.
* @param readerEvent
*/
- protected synchronized void sendOrQueueEvent(LlapRecordReader.ReaderEvent readerEvent) {
- LlapRecordReader recordReader = getRecordReader();
+ protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
+ LlapBaseRecordReader recordReader = getRecordReader();
if (recordReader != null) {
recordReader.handleEvent(readerEvent);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
new file mode 100644
index 0000000..7073280
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
+
+ DataInputStream din;
+ Schema schema;
+ Class<V> clazz;
+
+
+ protected Thread readerThread = null;
+ protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+ public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) {
+ din = new DataInputStream(in);
+ this.schema = schema;
+ this.clazz = clazz;
+ this.readerThread = Thread.currentThread();
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void close() throws IOException {
+ din.close();
+ }
+
+ @Override
+ public long getPos() { return 0; }
+
+ @Override
+ public float getProgress() { return 0f; }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public V createValue() {
+ try {
+ return clazz.newInstance();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean next(NullWritable key, V value) throws IOException {
+ try {
+ // Need a way to know what thread to interrupt, since this is a blocking thread.
+ setReaderThread(Thread.currentThread());
+
+ value.readFields(din);
+ return true;
+ } catch (EOFException eof) {
+ // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case DONE:
+ break;
+ default:
+ throw new IOException("Expected reader event with done status, but got "
+ + event.getEventType() + " with message " + event.getMessage());
+ }
+ return false;
+ } catch (IOException io) {
+ if (Thread.interrupted()) {
+ // Either we were interrupted by one of:
+ // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
+ // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+ // Either way we should not try to block trying to read the reader events queue.
+ if (readerEvents.isEmpty()) {
+ // Case 2.
+ throw io;
+ } else {
+ // Case 1. Fail the reader, sending back the error we received from the reader event.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case ERROR:
+ throw new IOException("Received reader event error: " + event.getMessage());
+ default:
+ throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
+ }
+ }
+ } else {
+ // If we weren't interrupted, just propagate the error
+ throw io;
+ }
+ }
+ }
+
+ /**
+ * Define success/error events which are passed to the reader from a different thread.
+ * The reader will check for these events on end of input and interruption of the reader thread.
+ */
+ public static class ReaderEvent {
+ public enum EventType {
+ DONE,
+ ERROR
+ }
+
+ protected final EventType eventType;
+ protected final String message;
+
+ protected ReaderEvent(EventType type, String message) {
+ this.eventType = type;
+ this.message = message;
+ }
+
+ public static ReaderEvent doneEvent() {
+ return new ReaderEvent(EventType.DONE, "");
+ }
+
+ public static ReaderEvent errorEvent(String message) {
+ return new ReaderEvent(EventType.ERROR, message);
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+ }
+
+ public void handleEvent(ReaderEvent event) {
+ switch (event.getEventType()) {
+ case DONE:
+ // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
+ readerEvents.add(event);
+ break;
+ case ERROR:
+ readerEvents.add(event);
+ if (readerThread == null) {
+ throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
+ }
+ // Reader is using a blocking socket .. interrupt it.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
+ }
+ getReaderThread().interrupt();
+ break;
+ default:
+ throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
+ }
+ }
+
+ protected ReaderEvent getReaderEvent() {
+ try {
+ ReaderEvent event = readerEvents.take();
+ return event;
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
+ }
+ }
+
+ protected synchronized void setReaderThread(Thread readerThread) {
+ this.readerThread = readerThread;
+ }
+
+ protected synchronized Thread getReaderThread() {
+ return readerThread;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 17a0d2d..02aedfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -20,7 +20,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.thrift.TDeserializer;
@@ -93,17 +93,7 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
out.writeUTF(locations[i].getLocation());
}
- byte[] binarySchema;
-
- try {
- TSerializer serializer = new TSerializer();
- byte[] serialzied = serializer.serialize(schema);
- out.writeInt(serialzied.length);
- out.write(serialzied);
- } catch (Exception e) {
- throw new IOException(e);
- }
-
+ schema.write(out);
out.writeUTF(llapUser);
}
@@ -125,17 +115,8 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
locations[i] = new SplitLocationInfo(in.readUTF(), false);
}
- length = in.readInt();
-
- try {
- byte[] schemaBytes = new byte[length];
- in.readFully(schemaBytes);
- TDeserializer tDeserializer = new TDeserializer();
- schema = new Schema();
- tDeserializer.deserialize(schema, schemaBytes);
- } catch (Exception e) {
- throw new IOException(e);
- }
+ schema = new Schema();
+ schema.readFields(in);
llapUser = in.readUTF();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
deleted file mode 100644
index 64e5e69..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.DataInputStream;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.hive.metastore.api.Schema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
- private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
-
- DataInputStream din;
- Schema schema;
- Class<V> clazz;
-
-
- protected Thread readerThread = null;
- protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
-
- public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) {
- din = new DataInputStream(in);
- this.schema = schema;
- this.clazz = clazz;
- this.readerThread = Thread.currentThread();
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void close() throws IOException {
- din.close();
- }
-
- @Override
- public long getPos() { return 0; }
-
- @Override
- public float getProgress() { return 0f; }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public V createValue() {
- try {
- return clazz.newInstance();
- } catch (Exception e) {
- return null;
- }
- }
-
- @Override
- public boolean next(NullWritable key, V value) throws IOException {
- try {
- // Need a way to know what thread to interrupt, since this is a blocking thread.
- setReaderThread(Thread.currentThread());
-
- value.readFields(din);
- return true;
- } catch (EOFException eof) {
- // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case DONE:
- break;
- default:
- throw new IOException("Expected reader event with done status, but got "
- + event.getEventType() + " with message " + event.getMessage());
- }
- return false;
- } catch (IOException io) {
- if (Thread.interrupted()) {
- // Either we were interrupted by one of:
- // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
- // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
- // Either way we should not try to block trying to read the reader events queue.
- if (readerEvents.isEmpty()) {
- // Case 2.
- throw io;
- } else {
- // Case 1. Fail the reader, sending back the error we received from the reader event.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case ERROR:
- throw new IOException("Received reader event error: " + event.getMessage());
- default:
- throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
- }
- }
- } else {
- // If we weren't interrupted, just propagate the error
- throw io;
- }
- }
- }
-
- /**
- * Define success/error events which are passed to the reader from a different thread.
- * The reader will check for these events on end of input and interruption of the reader thread.
- */
- public static class ReaderEvent {
- public enum EventType {
- DONE,
- ERROR
- }
-
- protected final EventType eventType;
- protected final String message;
-
- protected ReaderEvent(EventType type, String message) {
- this.eventType = type;
- this.message = message;
- }
-
- public static ReaderEvent doneEvent() {
- return new ReaderEvent(EventType.DONE, "");
- }
-
- public static ReaderEvent errorEvent(String message) {
- return new ReaderEvent(EventType.ERROR, message);
- }
-
- public EventType getEventType() {
- return eventType;
- }
-
- public String getMessage() {
- return message;
- }
- }
-
- public void handleEvent(ReaderEvent event) {
- switch (event.getEventType()) {
- case DONE:
- // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
- readerEvents.add(event);
- break;
- case ERROR:
- readerEvents.add(event);
- if (readerThread == null) {
- throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
- }
- // Reader is using a blocking socket .. interrupt it.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
- }
- getReaderThread().interrupt();
- break;
- default:
- throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
- }
- }
-
- protected ReaderEvent getReaderEvent() {
- try {
- ReaderEvent event = readerEvents.take();
- return event;
- } catch (InterruptedException ie) {
- throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
- }
- }
-
- protected synchronized void setReaderThread(Thread readerThread) {
- this.readerThread = readerThread;
- }
-
- protected synchronized Thread getReaderThread() {
- return readerThread;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 6267324..51027a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.TypeDesc;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Driver;
@@ -71,6 +74,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -220,7 +229,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
QueryPlan plan = driver.getPlan();
List<Task<?>> roots = plan.getRootTasks();
- Schema schema = plan.getResultSchema();
+ Schema schema = convertSchema(plan.getResultSchema());
if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
throw new HiveException("Was expecting a single TezTask.");
@@ -255,7 +264,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
plan = driver.getPlan();
roots = plan.getRootTasks();
- schema = plan.getResultSchema();
+ schema = convertSchema(plan.getResultSchema());
if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
throw new HiveException("Was expecting a single TezTask.");
@@ -416,6 +425,78 @@ public class GenericUDTFGetSplits extends GenericUDTF {
}
}
+ private TypeDesc convertTypeString(String typeString) throws HiveException {
+ TypeDesc typeDesc;
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
+ Preconditions.checkState(typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE,
+ "Unsupported non-primitive type " + typeString);
+
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case BOOLEAN:
+ typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN);
+ break;
+ case BYTE:
+ typeDesc = new TypeDesc(TypeDesc.Type.TINYINT);
+ break;
+ case SHORT:
+ typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT);
+ break;
+ case INT:
+ typeDesc = new TypeDesc(TypeDesc.Type.INT);
+ break;
+ case LONG:
+ typeDesc = new TypeDesc(TypeDesc.Type.BIGINT);
+ break;
+ case FLOAT:
+ typeDesc = new TypeDesc(TypeDesc.Type.FLOAT);
+ break;
+ case DOUBLE:
+ typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE);
+ break;
+ case STRING:
+ typeDesc = new TypeDesc(TypeDesc.Type.STRING);
+ break;
+ case CHAR:
+ CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo;
+ typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength());
+ break;
+ case VARCHAR:
+ VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo;
+ typeDesc = new TypeDesc(TypeDesc.Type.CHAR, varcharTypeInfo.getLength());
+ break;
+ case DATE:
+ typeDesc = new TypeDesc(TypeDesc.Type.DATE);
+ break;
+ case TIMESTAMP:
+ typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP);
+ break;
+ case BINARY:
+ typeDesc = new TypeDesc(TypeDesc.Type.BINARY);
+ break;
+ case DECIMAL:
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+ typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
+ break;
+ default:
+ throw new HiveException("Unsupported type " + typeString);
+ }
+
+ return typeDesc;
+ }
+
+ private Schema convertSchema(Object obj) throws HiveException {
+ org.apache.hadoop.hive.metastore.api.Schema schema = (org.apache.hadoop.hive.metastore.api.Schema) obj;
+ List<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+ for (FieldSchema fs : schema.getFieldSchemas()) {
+ String colName = fs.getName();
+ String typeString = fs.getType();
+ TypeDesc typeDesc = convertTypeString(typeString);
+ colDescs.add(new FieldDesc(colName, typeDesc));
+ }
+ Schema Schema = new Schema(colDescs);
+ return Schema;
+ }
+
@Override
public void close() throws HiveException {
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 7b516fe..37e21b8 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
@@ -112,10 +113,13 @@ public class TestLlapOutputFormat {
writer.close(null);
InputStream in = socket.getInputStream();
- RecordReader reader = new LlapRecordReader(in, null, Text.class);
+ LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class);
LOG.debug("Have record reader");
+ // Send done event, which LlapRecordReader is expecting upon end of input
+ reader.handleEvent(ReaderEvent.doneEvent());
+
int count = 0;
while(reader.next(NullWritable.get(), text)) {
LOG.debug(text.toString());