You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/02 04:37:49 UTC
[14/22] hive git commit: HIVE-11972 : [Refactor] Improve
determination of dynamic partitioning columns in FileSink Operator (Ashutosh
Chauhan via Prasanth J)
HIVE-11972 : [Refactor] Improve determination of dynamic partitioning columns in FileSink Operator (Ashutosh Chauhan via Prasanth J)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/24988f77
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/24988f77
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/24988f77
Branch: refs/heads/llap
Commit: 24988f77f2898bbcd91f5665b865bcc251e3cade
Parents: 522bb60
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Sat Sep 26 12:19:00 2015 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Oct 1 11:41:53 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FileSinkOperator.java | 19 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 17 +
.../optimizer/ConstantPropagateProcFactory.java | 11 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 10 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 30 +-
.../hive/ql/plan/DynamicPartitionCtx.java | 27 --
.../hive/ql/exec/TestFileSinkOperator.java | 384 ++++++++++++-------
7 files changed, 284 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 2604d5d..39944a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -493,24 +493,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has "
+ inputObjInspectors.length;
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
- // remove the last dpMapping.size() columns from the OI
- List<? extends StructField> fieldOI = soi.getAllStructFieldRefs();
- ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>();
- ArrayList<String> newFieldsName = new ArrayList<String>();
- this.dpStartCol = 0;
- for (StructField sf : fieldOI) {
- String fn = sf.getFieldName();
- if (!dpCtx.getInputToDPCols().containsKey(fn)) {
- newFieldsOI.add(sf.getFieldObjectInspector());
- newFieldsName.add(sf.getFieldName());
- this.dpStartCol++;
- } else {
- // once we found the start column for partition column we are done
- break;
- }
- }
- assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty";
-
+ this.dpStartCol = Utilities.getDPColOffset(conf);
this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
this.dpVals = new ArrayList<String>(numDynParts);
this.dpWritables = new ArrayList<Object>(numDynParts);
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index bcf85a4..5b21af9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -119,6 +119,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -3916,4 +3917,20 @@ public final class Utilities {
HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD, "");
}
}
+
+ public static int getDPColOffset(FileSinkDesc conf) {
+
+ if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns.
+ //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details.
+ return 1;
+ } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+ // For updates, ROW__ID is an extra column at index 0.
+ //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details.
+ return getColumnNames(conf.getTableInfo().getProperties()).size() + 1;
+ } else {
+ return getColumnNames(conf.getTableInfo().getProperties()).size();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
index 5c6a6df..25156b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
@@ -843,7 +843,7 @@ public final class ConstantPropagateProcFactory {
}
}
if (constant.getTypeInfo().getCategory() != Category.PRIMITIVE) {
- // nested complex types cannot be folded cleanly
+ // nested complex types cannot be folded cleanly
return null;
}
Object value = constant.getValue();
@@ -1163,16 +1163,15 @@ public final class ConstantPropagateProcFactory {
DynamicPartitionCtx dpCtx = fsdesc.getDynPartCtx();
if (dpCtx != null) {
- // If all dynamic partitions are propagated as constant, remove DP.
- Set<String> inputs = dpCtx.getInputToDPCols().keySet();
-
// Assume only 1 parent for FS operator
Operator<? extends Serializable> parent = op.getParentOperators().get(0);
Map<ColumnInfo, ExprNodeDesc> parentConstants = cppCtx.getPropagatedConstants(parent);
RowSchema rs = parent.getSchema();
boolean allConstant = true;
- for (String input : inputs) {
- ColumnInfo ci = rs.getColumnInfo(input);
+ int dpColStartIdx = Utilities.getDPColOffset(fsdesc);
+ List<ColumnInfo> colInfos = rs.getSignature();
+ for (int i = dpColStartIdx; i < colInfos.size(); i++) {
+ ColumnInfo ci = colInfos.get(i);
if (parentConstants.get(ci) == null) {
allConstant = false;
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 02fbdfe..c696fd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -497,9 +497,6 @@ public final class GenMapRedUtils {
partsList = PartitionPruner.prune(tsOp, parseCtx, alias_id);
} catch (SemanticException e) {
throw e;
- } catch (HiveException e) {
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
}
}
@@ -990,7 +987,7 @@ public final class GenMapRedUtils {
fileSinkOp.setParentOperators(Utilities.makeList(parent));
// Create a dummy TableScanOperator for the file generated through fileSinkOp
- TableScanOperator tableScanOp = (TableScanOperator) createTemporaryTableScanOperator(
+ TableScanOperator tableScanOp = createTemporaryTableScanOperator(
parent.getSchema());
// Connect this TableScanOperator to child.
@@ -1235,19 +1232,16 @@ public final class GenMapRedUtils {
// adding DP ColumnInfo to the RowSchema signature
ArrayList<ColumnInfo> signature = inputRS.getSignature();
String tblAlias = fsInputDesc.getTableInfo().getTableName();
- LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
for (String dpCol : dpCtx.getDPColNames()) {
ColumnInfo colInfo = new ColumnInfo(dpCol,
TypeInfoFactory.stringTypeInfo, // all partition column type should be string
tblAlias, true); // partition column is virtual column
signature.add(colInfo);
- colMap.put(dpCol, dpCol); // input and output have the same column name
}
inputRS.setSignature(signature);
// create another DynamicPartitionCtx, which has a different input-to-DP column mapping
DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
- dpCtx2.setInputToDPCols(colMap);
fsOutputDesc.setDynPartCtx(dpCtx2);
// update the FileSinkOperator to include partition columns
@@ -1896,7 +1890,7 @@ public final class GenMapRedUtils {
"Partition Names, " + Arrays.toString(partNames) + " don't match partition Types, "
+ Arrays.toString(partTypes));
- Map<String, String> typeMap = new HashMap();
+ Map<String, String> typeMap = new HashMap<>();
for (int i = 0; i < partNames.length; i++) {
String previousValue = typeMap.put(partNames[i], partTypes[i]);
Preconditions.checkArgument(previousValue == null, "Partition columns configuration is inconsistent. "
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index dbc6d8f..4bec228 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -736,7 +736,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
Path dataDir = null;
if(!qb.getEncryptedTargetTablePaths().isEmpty()) {
//currently only Insert into T values(...) is supported thus only 1 values clause
- //and only 1 target table are possible. If/when support for
+ //and only 1 target table are possible. If/when support for
//select ... from values(...) is added an insert statement may have multiple
//encrypted target tables.
dataDir = ctx.getMRTmpPath(qb.getEncryptedTargetTablePaths().get(0).toUri());
@@ -1556,7 +1556,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
for (String alias : tabAliases) {
String tab_name = qb.getTabNameForAlias(alias);
-
+
// we first look for this alias from CTE, and then from catalog.
/*
* if this s a CTE reference: Add its AST as a SubQuery to this QB.
@@ -6830,30 +6830,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
.getColumnInfos()), input), rowResolver);
input.setColumnExprMap(colExprMap);
}
-
- rowFields = opParseCtx.get(input).getRowResolver()
- .getColumnInfos();
- if (deleting()) {
- // Figure out if we have partition columns in the list or not. If so,
- // add them into the mapping. Partition columns will be located after the row id.
- if (rowFields.size() > 1) {
- // This means we have partition columns to deal with, so set up the mapping from the
- // input to the partition columns.
- dpCtx.mapInputToDP(rowFields.subList(1, rowFields.size()));
- }
- } else if (updating()) {
- // In this case we expect the number of in fields to exceed the number of out fields by one
- // (for the ROW__ID virtual column). If there are more columns than this,
- // then the extras are for dynamic partitioning
- if (dynPart && dpCtx != null) {
- dpCtx.mapInputToDP(rowFields.subList(tableFields.size() + 1, rowFields.size()));
- }
- } else {
- if (dynPart && dpCtx != null) {
- // create the mapping from input ExprNode to dest table DP column
- dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
- }
- }
return input;
}
@@ -10105,7 +10081,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return;
}
for (Node child : node.getChildren()) {
- //each insert of multi insert looks like
+ //each insert of multi insert looks like
//(TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME T1)))
if (((ASTNode) child).getToken().getType() != HiveParser.TOK_INSERT) {
continue;
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 24db7d0..95d5635 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -19,14 +19,11 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
public class DynamicPartitionCtx implements Serializable {
@@ -43,8 +40,6 @@ public class DynamicPartitionCtx implements Serializable {
private Path rootPath; // the root path DP columns paths start from
private int numBuckets; // number of buckets in each partition
- private Map<String, String> inputToDPCols; // mapping from input column names to DP columns
-
private List<String> spNames; // sp column names
private List<String> dpNames; // dp column names
private String defaultPartName; // default partition name in case of null or empty value
@@ -71,7 +66,6 @@ public class DynamicPartitionCtx implements Serializable {
}
this.numDPCols = dpNames.size();
this.numSPCols = spNames.size();
- this.inputToDPCols = new HashMap<String, String>();
if (this.numSPCols > 0) {
this.spPath = Warehouse.makeDynamicPartName(partSpec);
} else {
@@ -86,25 +80,12 @@ public class DynamicPartitionCtx implements Serializable {
this.spPath = dp.spPath;
this.rootPath = dp.rootPath;
this.numBuckets = dp.numBuckets;
- this.inputToDPCols = dp.inputToDPCols;
this.spNames = dp.spNames;
this.dpNames = dp.dpNames;
this.defaultPartName = dp.defaultPartName;
this.maxPartsPerNode = dp.maxPartsPerNode;
}
- public void mapInputToDP(List<ColumnInfo> fs) {
-
- assert fs.size() == this.numDPCols: "input DP column size != numDPCols";
-
- Iterator<ColumnInfo> itr1 = fs.iterator();
- Iterator<String> itr2 = dpNames.iterator();
-
- while (itr1.hasNext() && itr2.hasNext()) {
- inputToDPCols.put(itr1.next().getInternalName(), itr2.next());
- }
- }
-
public int getMaxPartitionsPerNode() {
return this.maxPartsPerNode;
}
@@ -161,14 +142,6 @@ public class DynamicPartitionCtx implements Serializable {
this.spNames = sp;
}
- public Map<String, String> getInputToDPCols() {
- return this.inputToDPCols;
- }
-
- public void setInputToDPCols(Map<String, String> map) {
- this.inputToDPCols = map;
- }
-
public void setNumDPCols(int dp) {
this.numDPCols = dp;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/24988f77/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index c6ae030..9e89376 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -45,12 +44,11 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
@@ -77,7 +75,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -94,8 +91,7 @@ public class TestFileSinkOperator {
private static TableDesc nonAcidTableDescriptor;
private static TableDesc acidTableDescriptor;
private static ObjectInspector inspector;
- private static List<TFSORow> rows;
- private static ValidTxnList txnList;
+ private static List<Row> rows;
private Path basePath;
private JobConf jc;
@@ -105,34 +101,33 @@ public class TestFileSinkOperator {
Properties properties = new Properties();
properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName());
nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+ properties.setProperty(serdeConstants.LIST_COLUMNS,"data");
properties = new Properties(properties);
properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1");
acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
-
tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") +
"testFileSinkOperator");
tmpdir.mkdir();
tmpdir.deleteOnExit();
- txnList = new ValidReadTxnList(new long[]{}, 2);
}
@Test
public void testNonAcidWrite() throws Exception {
setBasePath("write");
- setupData(DataFormat.SIMPLE);
+ setupData(DataFormat.WITH_PARTITION_VALUE);
FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0);
processRows(op);
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
@Test
public void testInsert() throws Exception {
setBasePath("insert");
- setupData(DataFormat.SIMPLE);
+ setupData(DataFormat.WITH_PARTITION_VALUE);
FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1);
processRows(op);
Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
@Test
@@ -142,7 +137,7 @@ public class TestFileSinkOperator {
FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2);
processRows(op);
Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID);
}
@Test
@@ -152,7 +147,7 @@ public class TestFileSinkOperator {
FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2);
processRows(op);
Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID);
}
@Test
@@ -161,7 +156,7 @@ public class TestFileSinkOperator {
setupData(DataFormat.WITH_PARTITION_VALUE);
FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0);
processRows(op);
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
@@ -174,7 +169,7 @@ public class TestFileSinkOperator {
// We only expect 5 here because we'll get whichever of the partitions published its stats
// last.
Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
@Test
@@ -184,19 +179,19 @@ public class TestFileSinkOperator {
FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2);
processRows(op);
Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
}
@Test
public void testDeleteDynamicPartitioning() throws Exception {
setBasePath("deleteDP");
- setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+ setupData(DataFormat.WITH_RECORD_ID);
FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2);
processRows(op);
// We only expect -5 here because we'll get whichever of the partitions published its stats
// last.
Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
- confirmOutput();
+ confirmOutput(DataFormat.WITH_RECORD_ID);
}
@@ -217,64 +212,52 @@ public class TestFileSinkOperator {
}
- private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE,
- WITH_RECORD_ID_AND_PARTITION_VALUE};
+ private enum DataFormat {WITH_RECORD_ID, WITH_PARTITION_VALUE, WITH_RECORD_ID_AND_PARTITION_VALUE};
private void setupData(DataFormat format) {
- // Build object inspector
- inspector = ObjectInspectorFactory.getReflectionObjectInspector
- (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
- rows = new ArrayList<TFSORow>();
-
+ Class<?> rType;
switch (format) {
- case SIMPLE:
- // Build rows
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("mary had a little lamb")
- )
- );
- }
+ case WITH_PARTITION_VALUE:
+ rType = RowWithPartVal.class;
break;
-
case WITH_RECORD_ID:
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("its fleect was white as snow"),
- new RecordIdentifier(1, 1, i)
- )
- );
- }
+ rType = RowWithRecID.class;
break;
-
- case WITH_PARTITION_VALUE:
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("its fleect was white as snow"),
- (i < 5) ? new Text("Monday") : new Text("Tuesday")
- )
- );
- }
- break;
-
case WITH_RECORD_ID_AND_PARTITION_VALUE:
- for (int i = 0; i < 10; i++) {
- rows.add(
- new TFSORow(
- new Text("its fleect was white as snow"),
- (i < 5) ? new Text("Monday") : new Text("Tuesday"),
- new RecordIdentifier(1, 1, i)
- )
- );
- }
+ rType = RowWithPartNRecID.class;
break;
-
default:
- throw new RuntimeException("Unknown option!");
+ throw new RuntimeException("Unknown type");
+ }
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (rType, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+
+ rows = new ArrayList<Row>();
+ Row r;
+ for (int i = 0; i < 10; i++) {
+ switch (format) {
+ case WITH_PARTITION_VALUE:
+ r =
+ new RowWithPartVal(
+ new Text("mary had a little lamb"),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"));
+ break;
+ case WITH_RECORD_ID:
+ r = new RowWithRecID(new RecordIdentifier(1, 1, i),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"));
+ break;
+ case WITH_RECORD_ID_AND_PARTITION_VALUE:
+ r = new RowWithPartNRecID(
+ new Text("its fleect was white as snow"),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"),
+ new RecordIdentifier(1, 1, i));
+ break;
+ default:
+ throw new RuntimeException("Unknown data format");
+ }
+ rows.add(r);
+
}
}
@@ -300,9 +283,6 @@ public class TestFileSinkOperator {
Map<String, String> partColMap= new LinkedHashMap<String, String>(1);
partColMap.put(PARTCOL_NAME, null);
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
- Map<String, String> partColNames = new HashMap<String, String>(1);
- partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
- dpCtx.setInputToDPCols(partColNames);
//todo: does this need the finalDestination?
desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
} else {
@@ -320,27 +300,27 @@ public class TestFileSinkOperator {
}
private void processRows(FileSinkOperator op) throws HiveException {
- for (TFSORow r : rows) op.process(r, 0);
+ for (Object r : rows) op.process(r, 0);
op.jobCloseOp(jc, true);
op.close(false);
}
- private void confirmOutput() throws IOException, SerDeException {
+ private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException {
Path[] paths = findFilesInBasePath();
- TFSOInputFormat input = new TFSOInputFormat();
+ TFSOInputFormat input = new TFSOInputFormat(rType);
FileInputFormat.setInputPaths(jc, paths);
InputSplit[] splits = input.getSplits(jc, 1);
- RecordReader<NullWritable, TFSORow> reader = input.getRecordReader(splits[0], jc,
+ RecordReader<NullWritable, Row> reader = input.getRecordReader(splits[0], jc,
Mockito.mock(Reporter.class));
NullWritable key = reader.createKey();
- TFSORow value = reader.createValue();
- List<TFSORow> results = new ArrayList<TFSORow>(rows.size());
- List<TFSORow> sortedRows = new ArrayList<TFSORow>(rows.size());
+ Row value = reader.createValue();
+ List<Row> results = new ArrayList<Row>(rows.size());
+ List<Row> sortedRows = new ArrayList<Row>(rows.size());
for (int i = 0; i < rows.size(); i++) {
Assert.assertTrue(reader.next(key, value));
- results.add(new TFSORow(value));
- sortedRows.add(new TFSORow(rows.get(i)));
+ results.add(value.clone());
+ sortedRows.add(rows.get(i));
}
Assert.assertFalse(reader.next(key, value));
Collections.sort(results);
@@ -370,36 +350,172 @@ public class TestFileSinkOperator {
}
}
- private static class TFSORow implements WritableComparable<TFSORow> {
+ public static interface Row extends WritableComparable<Row> {
+
+ Row clone() throws CloneNotSupportedException;
+ }
+
+ private static class RowWithRecID implements Row {
+
private RecordIdentifier recId;
- private Text data;
private Text partVal;
- TFSORow() {
- this(null, null, null);
+ public RowWithRecID() {
+ }
+ public RowWithRecID(RecordIdentifier recId, Text partVal) {
+ super();
+ this.recId = recId;
+ this.partVal = partVal;
}
- TFSORow(Text t) {
- this(t, null, null);
+ @Override
+ public
+ Row clone() throws CloneNotSupportedException {
+ return new RowWithRecID(this.recId, this.partVal);
}
- TFSORow(Text t, Text pv) {
- this(t, pv, null);
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ if (partVal == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ partVal.write(dataOutput);
+ }
+ if (recId == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ recId.write(dataOutput);
+ }
}
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ boolean notNull = dataInput.readBoolean();
+ if (notNull) {
+ partVal = new Text();
+ partVal.readFields(dataInput);
+ }
+ notNull = dataInput.readBoolean();
+ if (notNull) {
+ recId = new RecordIdentifier();
+ recId.readFields(dataInput);
+ }
- TFSORow(Text t, RecordIdentifier ri) {
- this(t, null, ri);
}
+ @Override
+ public int compareTo(Row row) {
+ RowWithRecID other = (RowWithRecID) row;
+ if (recId == null && other.recId == null) {
+ return comparePartVal(other);
+ } else if (recId == null) {
+ return -1;
+ } else {
+ int rc = recId.compareTo(other.recId);
+ if (rc == 0) return comparePartVal(other);
+ else return rc;
+ }
+ }
+ private int comparePartVal(RowWithRecID other) {
- TFSORow(Text t, Text pv, RecordIdentifier ri) {
+ return partVal.compareTo(other.partVal);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return compareTo((RowWithRecID)obj) == 0;
+ }
+ }
+ private static class RowWithPartVal implements Row {
+
+ public RowWithPartVal(Text data, Text partVal) {
+ super();
+ this.data = data;
+ this.partVal = partVal;
+ }
+
+ public RowWithPartVal() {
+ }
+
+ private Text data;
+ private Text partVal;
+
+ @Override
+ public Row clone() throws CloneNotSupportedException {
+ return new RowWithPartVal(this.data, this.partVal);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ data.write(dataOutput);
+ if (partVal == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ partVal.write(dataOutput);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ data = new Text();
+ data.readFields(dataInput);
+ boolean notNull = dataInput.readBoolean();
+ if (notNull) {
+ partVal = new Text();
+ partVal.readFields(dataInput);
+ }
+ }
+
+ @Override
+ public int compareTo(Row row) {
+ RowWithPartVal other = (RowWithPartVal) row;
+ if (partVal == null && other.partVal == null) {
+ return compareData(other);
+ } else if (partVal == null) {
+ return -1;
+ } else {
+ int rc = partVal.compareTo(other.partVal);
+ if (rc == 0) return compareData(other);
+ else return rc;
+ }
+ }
+
+ private int compareData(RowWithPartVal other) {
+ if (data == null && other.data == null) return 0;
+ else if (data == null) return -1;
+ else return data.compareTo(other.data);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof RowWithPartVal) {
+ RowWithPartVal other = (RowWithPartVal) obj;
+ return compareTo(other) == 0;
+
+ } else {
+ return false;
+ }
+ }
+ }
+ private static class RowWithPartNRecID implements Row {
+ private RecordIdentifier recId;
+ private Text data;
+ private Text partVal;
+
+ RowWithPartNRecID() {
+ this(null, null, null);
+ }
+
+ RowWithPartNRecID(Text t, Text pv, RecordIdentifier ri) {
data = t;
partVal = pv;
recId = ri;
-
}
- TFSORow(TFSORow other) {
- this(other.data, other.partVal, other.recId);
+ @Override
+ public RowWithPartNRecID clone() throws CloneNotSupportedException {
+ return new RowWithPartNRecID(this.data, this.partVal, this.recId);
}
@Override
@@ -437,8 +553,8 @@ public class TestFileSinkOperator {
@Override
public boolean equals(Object obj) {
- if (obj instanceof TFSORow) {
- TFSORow other = (TFSORow) obj;
+ if (obj instanceof RowWithPartNRecID) {
+ RowWithPartNRecID other = (RowWithPartNRecID) obj;
if (data == null && other.data == null) return checkPartVal(other);
else if (data == null) return false;
else if (data.equals(other.data)) return checkPartVal(other);
@@ -448,21 +564,22 @@ public class TestFileSinkOperator {
}
}
- private boolean checkPartVal(TFSORow other) {
+ private boolean checkPartVal(RowWithPartNRecID other) {
if (partVal == null && other.partVal == null) return checkRecId(other);
else if (partVal == null) return false;
else if (partVal.equals(other.partVal)) return checkRecId(other);
else return false;
}
- private boolean checkRecId(TFSORow other) {
+ private boolean checkRecId(RowWithPartNRecID other) {
if (recId == null && other.recId == null) return true;
else if (recId == null) return false;
else return recId.equals(other.recId);
}
@Override
- public int compareTo(TFSORow other) {
+ public int compareTo(Row row) {
+ RowWithPartNRecID other = (RowWithPartNRecID) row;
if (recId == null && other.recId == null) {
return comparePartVal(other);
} else if (recId == null) {
@@ -474,7 +591,7 @@ public class TestFileSinkOperator {
}
}
- private int comparePartVal(TFSORow other) {
+ private int comparePartVal(RowWithPartNRecID other) {
if (partVal == null && other.partVal == null) {
return compareData(other);
} else if (partVal == null) {
@@ -486,21 +603,26 @@ public class TestFileSinkOperator {
}
}
- private int compareData(TFSORow other) {
+ private int compareData(RowWithPartNRecID other) {
if (data == null && other.data == null) return 0;
else if (data == null) return -1;
else return data.compareTo(other.data);
}
}
- private static class TFSOInputFormat extends FileInputFormat<NullWritable, TFSORow>
- implements AcidInputFormat<NullWritable, TFSORow> {
+ private static class TFSOInputFormat extends FileInputFormat<NullWritable, Row>
+ implements AcidInputFormat<NullWritable, Row> {
FSDataInputStream in[] = null;
int readingFrom = -1;
+ DataFormat rType;
+
+ public TFSOInputFormat(DataFormat rType) {
+ this.rType = rType;
+ }
@Override
- public RecordReader<NullWritable, TFSORow> getRecordReader(
+ public RecordReader<NullWritable, Row> getRecordReader(
InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
if (in == null) {
Path paths[] = FileInputFormat.getInputPaths(entries);
@@ -511,10 +633,10 @@ public class TestFileSinkOperator {
}
readingFrom = 0;
}
- return new RecordReader<NullWritable, TFSORow>() {
+ return new RecordReader<NullWritable, Row>() {
@Override
- public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws
+ public boolean next(NullWritable nullWritable, Row tfsoRecord) throws
IOException {
try {
tfsoRecord.readFields(in[readingFrom]);
@@ -532,8 +654,18 @@ public class TestFileSinkOperator {
}
@Override
- public TFSORow createValue() {
- return new TFSORow();
+ public Row createValue() {
+ switch (rType) {
+ case WITH_RECORD_ID_AND_PARTITION_VALUE:
+ return new RowWithPartNRecID();
+ case WITH_PARTITION_VALUE:
+ return new RowWithPartVal();
+ case WITH_RECORD_ID:
+ return new RowWithRecID();
+
+ default:
+ throw new RuntimeException("Unknown row Type");
+ }
}
@Override
@@ -554,14 +686,14 @@ public class TestFileSinkOperator {
}
@Override
- public RowReader<TFSORow> getReader(InputSplit split,
+ public RowReader<Row> getReader(InputSplit split,
Options options) throws
IOException {
return null;
}
@Override
- public RawReader<TFSORow> getRawReader(Configuration conf,
+ public RawReader<Row> getRawReader(Configuration conf,
boolean collapseEvents,
int bucket,
ValidTxnList validTxnList,
@@ -578,9 +710,9 @@ public class TestFileSinkOperator {
}
}
- public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, TFSORow>
- implements AcidOutputFormat<NullWritable, TFSORow> {
- List<TFSORow> records = new ArrayList<TFSORow>();
+ public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, Row>
+ implements AcidOutputFormat<NullWritable, Row> {
+ List<Row> records = new ArrayList<>();
long numRecordsAdded = 0;
FSDataOutputStream out = null;
@@ -588,7 +720,6 @@ public class TestFileSinkOperator {
public RecordUpdater getRecordUpdater(final Path path, final Options options) throws
IOException {
- final StructObjectInspector inspector = (StructObjectInspector)options.getInspector();
return new RecordUpdater() {
@Override
public void insert(long currentTransaction, Object row) throws IOException {
@@ -608,9 +739,8 @@ public class TestFileSinkOperator {
}
private void addRow(Object row) {
- assert row instanceof TFSORow : "Expected TFSORow but got " +
- row.getClass().getName();
- records.add((TFSORow)row);
+ assert row instanceof Row : "Expected Row but got " + row.getClass().getName();
+ records.add((Row)row);
}
@Override
@@ -619,7 +749,7 @@ public class TestFileSinkOperator {
FileSystem fs = path.getFileSystem(options.getConfiguration());
out = fs.create(path);
}
- for (TFSORow r : records) r.write(out);
+ for (Writable r : records) r.write(out);
records.clear();
out.flush();
}
@@ -657,8 +787,8 @@ public class TestFileSinkOperator {
return new FileSinkOperator.RecordWriter() {
@Override
public void write(Writable w) throws IOException {
- Assert.assertTrue(w instanceof TFSORow);
- records.add((TFSORow) w);
+ Assert.assertTrue(w instanceof Row);
+ records.add((Row)w);
}
@Override
@@ -667,7 +797,7 @@ public class TestFileSinkOperator {
FileSystem fs = finalOutPath.getFileSystem(jc);
out = fs.create(finalOutPath);
}
- for (TFSORow r : records) r.write(out);
+ for (Writable r : records) r.write(out);
records.clear();
out.flush();
out.close();
@@ -676,7 +806,7 @@ public class TestFileSinkOperator {
}
@Override
- public RecordWriter<NullWritable, TFSORow> getRecordWriter(
+ public RecordWriter<NullWritable, Row> getRecordWriter(
FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws
IOException {
return null;
@@ -688,7 +818,7 @@ public class TestFileSinkOperator {
}
}
- public static class TFSOSerDe implements SerDe {
+ public static class TFSOSerDe extends AbstractSerDe {
@Override
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
@@ -697,20 +827,18 @@ public class TestFileSinkOperator {
@Override
public Class<? extends Writable> getSerializedClass() {
- return TFSORow.class;
+ return RowWithPartNRecID.class;
}
@Override
public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
- assert obj instanceof TFSORow : "Expected TFSORow or decendent, got "
- + obj.getClass().getName();
- return (TFSORow)obj;
+ assert obj instanceof Row : "Expected Row or decendent, got " + obj.getClass().getName();
+ return (Row)obj;
}
@Override
public Object deserialize(Writable blob) throws SerDeException {
- assert blob instanceof TFSORow : "Expected TFSORow or decendent, got "
- + blob.getClass().getName();
+ assert blob instanceof Row : "Expected Row or decendent, got "+ blob.getClass().getName();
return blob;
}