You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/15 22:41:04 UTC

[44/50] [abbrv] hive git commit: HIVE-19534: Allow implementations to access member variables of AbstractRecordWriter (Prasanth Jayachandran reviewed by Matt Burgess, Ashutosh Chauhan)

HIVE-19534: Allow implementations to access member variables of AbstractRecordWriter (Prasanth Jayachandran reviewed by Matt Burgess, Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3ea0356f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3ea0356f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3ea0356f

Branch: refs/heads/branch-3.0.0
Commit: 3ea0356f7dd9fc4d3406806d80c349187afd9d64
Parents: 66f6748
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon May 14 17:19:34 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon May 14 17:19:34 2018 -0700

----------------------------------------------------------------------
 .../hive/streaming/AbstractRecordWriter.java    | 92 ++++++++++----------
 1 file changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3ea0356f/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index b6c8890..0866850 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -64,48 +64,48 @@ public abstract class AbstractRecordWriter implements RecordWriter {
 
   private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]";
   protected HiveConf conf;
-  private StreamingConnection conn;
+  protected StreamingConnection conn;
   protected Table table;
-  List<String> inputColumns;
-  List<String> inputTypes;
-  private String fullyQualifiedTableName;
-  private Map<String, List<RecordUpdater>> updaters = new HashMap<>();
-  private Map<String, Path> partitionPaths = new HashMap<>();
-  private Set<String> addedPartitions = new HashSet<>();
+  protected List<String> inputColumns;
+  protected List<String> inputTypes;
+  protected String fullyQualifiedTableName;
+  protected Map<String, List<RecordUpdater>> updaters = new HashMap<>();
+  protected Map<String, Path> partitionPaths = new HashMap<>();
+  protected Set<String> addedPartitions = new HashSet<>();
   // input OI includes table columns + partition columns
-  private StructObjectInspector inputRowObjectInspector;
+  protected StructObjectInspector inputRowObjectInspector;
   // output OI strips off the partition columns and retains other columns
-  private ObjectInspector outputRowObjectInspector;
-  private List<String> partitionColumns = new ArrayList<>();
-  private ObjectInspector[] partitionObjInspectors = null;
-  private StructField[] partitionStructFields = null;
-  private Object[] partitionFieldData;
-  private ObjectInspector[] bucketObjInspectors = null;
-  private StructField[] bucketStructFields = null;
-  private Object[] bucketFieldData;
-  private List<Integer> bucketIds = new ArrayList<>();
-  private int totalBuckets;
-  private String defaultPartitionName;
-  private boolean isBucketed;
-  private AcidOutputFormat<?, ?> acidOutputFormat;
-  private Long curBatchMinWriteId;
-  private Long curBatchMaxWriteId;
-  private final String lineDelimiter;
-  private HeapMemoryMonitor heapMemoryMonitor;
+  protected ObjectInspector outputRowObjectInspector;
+  protected List<String> partitionColumns = new ArrayList<>();
+  protected ObjectInspector[] partitionObjInspectors = null;
+  protected StructField[] partitionStructFields = null;
+  protected Object[] partitionFieldData;
+  protected ObjectInspector[] bucketObjInspectors = null;
+  protected StructField[] bucketStructFields = null;
+  protected Object[] bucketFieldData;
+  protected List<Integer> bucketIds = new ArrayList<>();
+  protected int totalBuckets;
+  protected String defaultPartitionName;
+  protected boolean isBucketed;
+  protected AcidOutputFormat<?, ?> acidOutputFormat;
+  protected Long curBatchMinWriteId;
+  protected Long curBatchMaxWriteId;
+  protected final String lineDelimiter;
+  protected HeapMemoryMonitor heapMemoryMonitor;
   // if low memory canary is set and if records after set canary exceeds threshold, trigger a flush.
   // This is to avoid getting notified of low memory too often and flushing too often.
-  private AtomicBoolean lowMemoryCanary;
-  private long ingestSizeBytes = 0;
-  private boolean autoFlush;
-  private float memoryUsageThreshold;
-  private long ingestSizeThreshold;
+  protected AtomicBoolean lowMemoryCanary;
+  protected long ingestSizeBytes = 0;
+  protected boolean autoFlush;
+  protected float memoryUsageThreshold;
+  protected long ingestSizeThreshold;
 
   public AbstractRecordWriter(final String lineDelimiter) {
     this.lineDelimiter = lineDelimiter == null || lineDelimiter.isEmpty() ?
       DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter;
   }
 
-  private static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener {
+  protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener {
     private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName());
     private final AtomicBoolean lowMemoryCanary;
 
@@ -179,7 +179,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
-  private void setupMemoryMonitoring() {
+  protected void setupMemoryMonitoring() {
     this.autoFlush = conf.getBoolVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_ENABLED);
     this.memoryUsageThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_HEAP_MEMORY_MONITOR_USAGE_THRESHOLD);
     this.ingestSizeThreshold = conf.getSizeVar(HiveConf.ConfVars.HIVE_STREAMING_AUTO_FLUSH_CHECK_INTERVAL_SIZE);
@@ -201,7 +201,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
-  private void prepareBucketingFields() {
+  protected void prepareBucketingFields() {
     this.isBucketed = table.getSd().getNumBuckets() > 0;
     // For unbucketed tables we have exactly 1 RecordUpdater (until HIVE-19208) for each AbstractRecordWriter which
     // ends up writing to a file bucket_000000.
@@ -219,7 +219,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
-  private void preparePartitioningFields() {
+  protected void preparePartitioningFields() {
     final int numPartitions = table.getPartitionKeys().size();
     this.partitionFieldData = new Object[numPartitions];
     this.partitionObjInspectors = new ObjectInspector[numPartitions];
@@ -240,12 +240,12 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   /**
    * used to tag error msgs to provided some breadcrumbs
    */
-  private String getWatermark(String partition) {
+  protected String getWatermark(String partition) {
     return partition + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
   }
 
   // return the column numbers of the bucketed columns
-  private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+  protected List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
     ArrayList<Integer> result = new ArrayList<>(bucketCols.size());
     HashSet<String> bucketSet = new HashSet<>(bucketCols);
     for (int i = 0; i < cols.size(); i++) {
@@ -275,7 +275,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
   public abstract Object encode(byte[] record) throws SerializationError;
 
   // returns the bucket number to which the record belongs to
-  private int getBucket(Object row) {
+  protected int getBucket(Object row) {
     if (!isBucketed) {
       return 0;
     }
@@ -288,7 +288,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
       ObjectInspectorUtils.getBucketNumberOld(bucketFields, bucketObjInspectors, totalBuckets);
   }
 
-  private List<String> getPartitionValues(final Object row) {
+  protected List<String> getPartitionValues(final Object row) {
     if (!conn.isPartitionedTable()) {
       return null;
     }
@@ -359,7 +359,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
-  private static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+  protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
     , StructObjectInspector recordObjInspector) {
     ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
 
@@ -371,14 +371,14 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     return result;
   }
 
-  private Object[] getBucketFields(Object row) {
+  protected Object[] getBucketFields(Object row) {
     for (int i = 0; i < bucketIds.size(); i++) {
       bucketFieldData[i] = inputRowObjectInspector.getStructFieldData(row, bucketStructFields[i]);
     }
     return bucketFieldData;
   }
 
-  private Object[] getPartitionFields(Object row) {
+  protected Object[] getPartitionFields(Object row) {
     for (int i = 0; i < partitionFieldData.length; i++) {
       partitionFieldData[i] = inputRowObjectInspector.getStructFieldData(row, partitionStructFields[i]);
     }
@@ -412,7 +412,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     }
   }
 
-  private void checkAutoFlush() throws StreamingIOFailure {
+  protected void checkAutoFlush() throws StreamingIOFailure {
     if (!autoFlush) {
       return;
     }
@@ -444,7 +444,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     return addedPartitions;
   }
 
-  private RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId,
+  protected RecordUpdater createRecordUpdater(final Path partitionPath, int bucketId, Long minWriteId,
     Long maxWriteID)
     throws IOException {
     // Initialize table properties from the table parameters. This is required because the table
@@ -463,7 +463,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
         .finalDestination(partitionPath));
   }
 
-  private RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
+  protected RecordUpdater getRecordUpdater(List<String> partitionValues, int bucketId) throws StreamingIOFailure {
     RecordUpdater recordUpdater;
     String key;
     Path destLocation;
@@ -510,7 +510,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     return recordUpdater;
   }
 
-  private List<RecordUpdater> initializeBuckets() {
+  protected List<RecordUpdater> initializeBuckets() {
     List<RecordUpdater> result = new ArrayList<>(totalBuckets);
     for (int bucket = 0; bucket < totalBuckets; bucket++) {
       result.add(bucket, null); //so that get(i) returns null rather than ArrayOutOfBounds
@@ -518,7 +518,7 @@ public abstract class AbstractRecordWriter implements RecordWriter {
     return result;
   }
 
-  private void logStats(final String prefix) {
+  protected void logStats(final String prefix) {
     int openRecordUpdaters = updaters.values()
       .stream()
       .mapToInt(List::size)