You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/15 22:41:04 UTC
[44/50] [abbrv] hive git commit: HIVE-19534: Allow implementations to
access member variables of AbstractRecordWriter (Prasanth Jayachandran
reviewed by Matt Burgess, Ashutosh Chauhan)
HIVE-19534: Allow implementations to access member variables of AbstractRecordWriter (Prasanth Jayachandran reviewed by Matt Burgess, Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3ea0356f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3ea0356f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3ea0356f
Branch: refs/heads/branch-3.0.0
Commit: 3ea0356f7dd9fc4d3406806d80c349187afd9d64
Parents: 66f6748
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon May 14 17:19:34 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon May 14 17:19:34 2018 -0700
----------------------------------------------------------------------
.../hive/streaming/AbstractRecordWriter.java | 92 ++++++++++----------
1 file changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3ea0356f/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index b6c8890..0866850 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -64,48 +64,48 @@ public abstract class AbstractRecordWriter implements RecordWriter {
private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]";
protected HiveConf conf;
- private StreamingConnection conn;
+ protected StreamingConnection conn;
protected Table table;
- List<String> inputColumns;
- List<String> inputTypes;
- private String fullyQualifiedTableName;
- private Map<String, List<RecordUpdater>> updaters = new HashMap<>();
- private Map<String, Path> partitionPaths = new HashMap<>();
- private Set<String> addedPartitions = new HashSet<>();
+ protected List<String> inputColumns;
+ protected List<String> inputTypes;
+ protected String fullyQualifiedTableName;
+ protected Map<String, List<RecordUpdater>> updaters = new HashMap<>();
+ protected Map<String, Path> partitionPaths = new HashMap<>();
+ protected Set<String> addedPartitions = new HashSet<>();
// input OI includes table columns + partition columns
- private StructObjectInspector inputRowObjectInspector;
+ protected StructObjectInspector inputRowObjectInspector;
// output OI strips off the partition columns and retains other columns
- private ObjectInspector outputRowObjectInspector;
- private List<String> partitionColumns = new ArrayList<>();
- private ObjectInspector[] partitionObjInspectors = null;
- private StructField[] partitionStructFields = null;
- private Object[] partitionFieldData;
- private ObjectInspector[] bucketObjInspectors = null;
- private StructField[] bucketStructFields = null;
- private Object[] bucketFieldData;
- private List<Integer> bucketIds = new ArrayList<>();
- private int totalBuckets;
- private String defaultPartitionName;
- private boolean isBucketed;
- private AcidOutputFormat<?, ?> acidOutputFormat;
- private Long curBatchMinWriteId;
- private Long curBatchMaxWriteId;
- private final String lineDelimiter;
- private HeapMemoryMonitor heapMemoryMonitor;
+ protected ObjectInspector outputRowObjectInspector;
+ protected List<String> partitionColumns = new ArrayList<>();
+ protected ObjectInspector[] partitionObjInspectors = null;
+ protected StructField[] partitionStructFields = null;
+ protected Object[] partitionFieldData;
+ protected ObjectInspector[] bucketObjInspectors = null;
+ protected StructField[] bucketStructFields = null;
+ protected Object[] bucketFieldData;
+ protected List<Integer> bucketIds = new ArrayList<>();
+ protected int totalBuckets;
+ protected String defaultPartitionName;
+ protected boolean isBucketed;
+ protected AcidOutputFormat<?, ?> acidOutputFormat;
+ protected Long curBatchMinWriteId;
+ protected Long curBatchMaxWriteId;
+ protected final String lineDelimiter;
+ protected HeapMemoryMonitor heapMemoryMonitor;
// if low memory canary is set and if records after set canary exceeds threshold, trigger a flush.
// This is to avoid getting notified of low memory too often and flushing too often.
- private AtomicBoolean lowMemoryCanary;
- private long ingestSizeBytes = 0;
- private boolean autoFlush;
- private float memoryUsageThreshold;
- private long ingestSizeThreshold;
+ protected AtomicBoolean lowMemoryCanary;
+ protected long ingestSizeBytes = 0;
+ protected boolean autoFlush;
+ protected float memoryUsageThreshold;
+ protected long ingestSizeThreshold;
public AbstractRecordWriter(final String lineDelimiter) {
this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ?
DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter;
}
- private static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener {
+ protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener {
private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName());
private final AtomicBoolean lowMemoryCanary;
@@ -179,7 +179,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
- private void setupMemoryMonitoring() {
+ protected void setupMemoryMonitoring() {
this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED);
this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD);
this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE);
@@ -201,7 +201,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
- private void prepareBucketingFields() {
+ protected void prepareBucketingFields() {
this.isBucketed = table.getSd().getNumBuckets() > 0;
// For unbucketed tables we have exactly 1 RecordUpdater (until HIVE-19208) for each AbstractRecordWriter which
// ends up writing to a file bucket_000000.
@@ -219,7 +219,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
- private void preparePartitioningFields() {
+ protected void preparePartitioningFields() {
final int numPartitions = table.getPartitionKeys().size();
this.partitionFieldData = new Object[numPartitions];
this.partitionObjInspectors = new ObjectInspector[numPartitions];
@@ -240,12 +240,12 @@ public abstract class AbstractRecordWriter implements RecordWriter {
/**
* used to tag error msgs to provided some breadcrumbs
*/
- private String getWatermark(String partition) {
+ protected String getWatermark(String partition) {
return partition + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
}
// return the column numbers of the bucketed columns
- private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+ protected List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
ArrayList<Integer> result = new ArrayList<>(bucketCols.size());
HashSet<String> bucketSet = new HashSet<>(bucketCols);
for (int i = 0; i < cols.size(); i++) {
@@ -275,7 +275,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
public abstract Object encode(byte[] record) throws SerializationError;
// returns the bucket number to which the record belongs to
- private int getBucket(Object row) {
+ protected int getBucket(Object row) {
if (!isBucketed) {
return 0;
}
@@ -288,7 +288,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
ObjectInspectorUtils.getBucketNumberOld(bucketFields, bucketObjInspectors, totalBuckets);
}
- private List<String> getPartitionValues(final Object row) {
+ protected List<String> getPartitionValues(final Object row) {
if (!conn.isPartitionedTable()) {
return null;
}
@@ -359,7 +359,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
- private static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+ protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
, StructObjectInspector recordObjInspector) {
ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
@@ -371,14 +371,14 @@ public abstract class AbstractRecordWriter implements RecordWriter {
return result;
}
- private Object[] getBucketFields(Object row) {
+ protected Object[] getBucketFields(Object row) {
for (int i = 0; i < bucketIds.size(); i++) {
bucketFieldData[i] = inputRowObjectInspector.getStructFieldData(row, bucketStructFields[i]);
}
return bucketFieldData;
}
- private Object[] getPartitionFields(Object row) {
+ protected Object[] getPartitionFields(Object row) {
for (int i = 0; i < partitionFieldData.length; i++) {
partitionFieldData[i] = inputRowObjectInspector.getStructFieldData(row, partitionStructFields[i]);
}
@@ -412,7 +412,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
}
}
- private void checkAutoFlush() throws StreamingIOFailure {
+ protected void checkAutoFlush() throws StreamingIOFailure {
if (!autoFlush) {
return;
}
@@ -444,7 +444,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
return addedPartitions;
}
- private RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId,
+ protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId,
Long maxWriteID)
throws IOException {
// Initialize table properties from the table parameters. This is required because the table
@@ -463,7 +463,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
.finalDestination(partitionPath));
}
- private RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
+ protected RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
RecordUpdater recordUpdater;
String key;
Path destLocation;
@@ -510,7 +510,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
return recordUpdater;
}
- private List<RecordUpdater> initializeBuckets() {
+ protected List<RecordUpdater> initializeBuckets() {
List<RecordUpdater> result = new ArrayList<>(totalBuckets);
for (int bucket = 0; bucket < totalBuckets; bucket++) {
result.add(bucket, null); //so that get(i) returns null rather than ArrayOutOfBounds
@@ -518,7 +518,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
return result;
}
- private void logStats(final String prefix) {
+ protected void logStats(final String prefix) {
int openRecordUpdaters = updaters.values()
.stream()
.mapToInt(List::size)