You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2015/05/06 23:04:38 UTC
hive git commit: HIVE-9845 : HCatSplit repeats information making
input split data size huge (Mithun Radhakrishnan via Sushanth Sowmyan)
Repository: hive
Updated Branches:
refs/heads/master dc72c8736 -> 18fb46017
HIVE-9845 : HCatSplit repeats information making input split data size huge (Mithun Radhakrishnan via Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/18fb4601
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/18fb4601
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/18fb4601
Branch: refs/heads/master
Commit: 18fb460179ff48d2c1e65f324799b4315616f14b
Parents: dc72c87
Author: Sushanth Sowmyan <kh...@gmail.com>
Authored: Wed May 6 14:03:37 2015 -0700
Committer: Sushanth Sowmyan <kh...@gmail.com>
Committed: Wed May 6 14:04:32 2015 -0700
----------------------------------------------------------------------
.../hcatalog/mapreduce/HCatBaseInputFormat.java | 20 ++--
.../hive/hcatalog/mapreduce/HCatSplit.java | 21 +---
.../hive/hcatalog/mapreduce/HCatTableInfo.java | 12 ++
.../hive/hcatalog/mapreduce/InputJobInfo.java | 5 +
.../hive/hcatalog/mapreduce/PartInfo.java | 117 +++++++++++++++++--
.../mapreduce/TestHCatOutputFormat.java | 5 +-
6 files changed, 139 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
index 55b97dd..adfaf4e 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -130,16 +130,6 @@ public abstract class HCatBaseInputFormat
setInputPath(jobConf, partitionInfo.getLocation());
Map<String, String> jobProperties = partitionInfo.getJobProperties();
- HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
- for (HCatFieldSchema field :
- inputJobInfo.getTableInfo().getDataColumns().getFields()) {
- allCols.append(field);
- }
- for (HCatFieldSchema field :
- inputJobInfo.getTableInfo().getPartitionColumns().getFields()) {
- allCols.append(field);
- }
-
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
storageHandler = HCatUtil.getStorageHandler(
@@ -163,9 +153,7 @@ public abstract class HCatBaseInputFormat
inputFormat.getSplits(jobConf, desiredNumSplits);
for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
- splits.add(new HCatSplit(
- partitionInfo,
- split, allCols));
+ splits.add(new HCatSplit(partitionInfo, split));
}
}
@@ -190,6 +178,12 @@ public abstract class HCatBaseInputFormat
HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+ // Ensure PartInfo's TableInfo is initialized.
+ if (partitionInfo.getTableInfo() == null) {
+ partitionInfo.setTableInfo(((InputJobInfo)HCatUtil.deserialize(
+ taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)
+ )).getTableInfo());
+ }
JobContext jobContext = taskContext;
Configuration conf = jobContext.getConfiguration();
http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
index bcedb3a..0aa498a 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatSplit.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -44,11 +43,6 @@ public class HCatSplit extends InputSplit
/** The split returned by the underlying InputFormat split. */
private org.apache.hadoop.mapred.InputSplit baseMapRedSplit;
- /** The schema for the HCatTable */
- private HCatSchema tableSchema;
-
- private HiveConf hiveConf;
-
/**
* Instantiates a new hcat split.
*/
@@ -60,16 +54,13 @@ public class HCatSplit extends InputSplit
*
* @param partitionInfo the partition info
* @param baseMapRedSplit the base mapred split
- * @param tableSchema the table level schema
*/
public HCatSplit(PartInfo partitionInfo,
- org.apache.hadoop.mapred.InputSplit baseMapRedSplit,
- HCatSchema tableSchema) {
+ org.apache.hadoop.mapred.InputSplit baseMapRedSplit) {
this.partitionInfo = partitionInfo;
// dataSchema can be obtained from partitionInfo.getPartitionSchema()
this.baseMapRedSplit = baseMapRedSplit;
- this.tableSchema = tableSchema;
}
/**
@@ -101,7 +92,8 @@ public class HCatSplit extends InputSplit
* @return the table schema
*/
public HCatSchema getTableSchema() {
- return this.tableSchema;
+ assert this.partitionInfo.getTableInfo() != null : "TableInfo should have been set at this point.";
+ return this.partitionInfo.getTableInfo().getAllColumns();
}
/* (non-Javadoc)
@@ -159,9 +151,6 @@ public class HCatSplit extends InputSplit
} catch (Exception e) {
throw new IOException("Exception from " + baseSplitClassName, e);
}
-
- String tableSchemaString = WritableUtils.readString(input);
- tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString);
}
/* (non-Javadoc)
@@ -178,10 +167,6 @@ public class HCatSplit extends InputSplit
Writable baseSplitWritable = (Writable) baseMapRedSplit;
//write baseSplit into output
baseSplitWritable.write(output);
-
- //write the table schema into output
- String tableSchemaString = HCatUtil.serialize(tableSchema);
- WritableUtils.writeString(output, tableSchemaString);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
index 13faf15..14c93ab 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatTableInfo.java
@@ -21,10 +21,13 @@ package org.apache.hive.hcatalog.mapreduce;
import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
/**
@@ -112,6 +115,15 @@ public class HCatTableInfo implements Serializable {
}
/**
+ * @return HCatSchema with all columns (i.e. data and partition columns).
+ */
+ public HCatSchema getAllColumns() {
+ List<HCatFieldSchema> allColumns = Lists.newArrayList(dataColumns.getFields());
+ allColumns.addAll(partitionColumns.getFields());
+ return new HCatSchema(allColumns);
+ }
+
+ /**
* @return the storerInfo
*/
public StorerInfo getStorerInfo() {
http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
index 360e77b..1f23f3f 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java
@@ -182,5 +182,10 @@ public class InputJobInfo implements Serializable {
ObjectInputStream partInfoReader =
new ObjectInputStream(new InflaterInputStream(ois));
partitions = (List<PartInfo>)partInfoReader.readObject();
+ for (PartInfo partInfo : partitions) {
+ if (partInfo.getTableInfo() == null) {
+ partInfo.setTableInfo(this.tableInfo);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
index 651a9a0..fca0a92 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/PartInfo.java
@@ -18,27 +18,32 @@
*/
package org.apache.hive.hcatalog.mapreduce;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** The Class used to serialize the partition information read from the metadata server that maps to a partition. */
public class PartInfo implements Serializable {
+ private static Logger LOG = LoggerFactory.getLogger(PartInfo.class);
/** The serialization version */
private static final long serialVersionUID = 1L;
- /** The partition schema. */
- private final HCatSchema partitionSchema;
+ /** The partition data-schema. */
+ private HCatSchema partitionSchema;
/** The information about which input storage handler to use */
- private final String storageHandlerClassName;
- private final String inputFormatClassName;
- private final String outputFormatClassName;
- private final String serdeClassName;
+ private String storageHandlerClassName;
+ private String inputFormatClassName;
+ private String outputFormatClassName;
+ private String serdeClassName;
/** HCat-specific properties set at the partition */
private final Properties hcatProperties;
@@ -52,8 +57,11 @@ public class PartInfo implements Serializable {
/** Job properties associated with this parition */
Map<String, String> jobProperties;
- /** the table info associated with this partition */
- HCatTableInfo tableInfo;
+ /**
+ * The table info associated with this partition.
+ * Not serialized per PartInfo instance. Constant, per table.
+ */
+ transient HCatTableInfo tableInfo;
/**
* Instantiates a new hcat partition info.
@@ -162,4 +170,97 @@ public class PartInfo implements Serializable {
public HCatTableInfo getTableInfo() {
return tableInfo;
}
+
+ void setTableInfo(HCatTableInfo thatTableInfo) {
+ this.tableInfo = thatTableInfo;
+
+ if (partitionSchema == null) {
+ partitionSchema = tableInfo.getDataColumns();
+ }
+
+ if (storageHandlerClassName == null) {
+ storageHandlerClassName = tableInfo.getStorerInfo().getStorageHandlerClass();
+ }
+
+ if (inputFormatClassName == null) {
+ inputFormatClassName = tableInfo.getStorerInfo().getIfClass();
+ }
+
+ if (outputFormatClassName == null) {
+ outputFormatClassName = tableInfo.getStorerInfo().getOfClass();
+ }
+
+ if (serdeClassName == null) {
+ serdeClassName = tableInfo.getStorerInfo().getSerdeClass();
+ }
+ }
+
+ /**
+ * Serialization method. Suppresses serialization of redundant information that's already
+ * available from TableInfo.
+ */
+ private void writeObject(ObjectOutputStream oos)
+ throws IOException {
+ // Suppress commonality with TableInfo.
+
+ assert tableInfo != null : "TableInfo can't be null at this point.";
+
+ if (partitionSchema != null) {
+ if (partitionSchema.equals(tableInfo.getDataColumns())) {
+ partitionSchema = null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Can't suppress data-schema. Partition-schema and table-schema seem to differ! "
+ + " partitionSchema: " + partitionSchema.getFields()
+ + " tableSchema: " + tableInfo.getDataColumns());
+ }
+ }
+ }
+
+ if (storageHandlerClassName != null) {
+ if (storageHandlerClassName.equals(tableInfo.getStorerInfo().getStorageHandlerClass())) {
+ storageHandlerClassName = null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition's storageHandler (" + storageHandlerClassName + ") " +
+ "differs from table's storageHandler (" + tableInfo.getStorerInfo().getStorageHandlerClass() + ").");
+ }
+ }
+ }
+
+ if (inputFormatClassName != null) {
+ if (inputFormatClassName.equals(tableInfo.getStorerInfo().getIfClass())) {
+ inputFormatClassName = null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition's InputFormat (" + inputFormatClassName + ") " +
+ "differs from table's InputFormat (" + tableInfo.getStorerInfo().getIfClass() + ").");
+ }
+ }
+ }
+
+ if (outputFormatClassName != null) {
+ if (outputFormatClassName.equals(tableInfo.getStorerInfo().getOfClass())) {
+ outputFormatClassName = null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition's OutputFormat (" + outputFormatClassName + ") " +
+ "differs from table's OutputFormat (" + tableInfo.getStorerInfo().getOfClass() + ").");
+ }
+ }
+ }
+
+ if (serdeClassName != null) {
+ if (serdeClassName.equals(tableInfo.getStorerInfo().getSerdeClass())) {
+ serdeClassName = null;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition's SerDe (" + serdeClassName + ") " +
+ "differs from table's SerDe (" + tableInfo.getStorerInfo().getSerdeClass() + ").");
+ }
+ }
+ }
+
+ oos.defaultWriteObject();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/18fb4601/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
index add9d41..f716da9 100644
--- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
+++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Lists;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
@@ -106,7 +107,7 @@ public class TestHCatOutputFormat extends TestCase {
tbl.setDbName(dbName);
tbl.setTableName(tblName);
StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(fields);
+ sd.setCols(Lists.newArrayList(new FieldSchema("data_column", serdeConstants.STRING_TYPE_NAME, "")));
tbl.setSd(sd);
//sd.setLocation("hdfs://tmp");
@@ -151,7 +152,7 @@ public class TestHCatOutputFormat extends TestCase {
assertEquals(1, jobInfo.getPartitionValues().size());
assertEquals("p1", jobInfo.getPartitionValues().get("colname"));
assertEquals(1, jobInfo.getTableInfo().getDataColumns().getFields().size());
- assertEquals("colname", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
+ assertEquals("data_column", jobInfo.getTableInfo().getDataColumns().getFields().get(0).getName());
publishTest(job);
}