You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by li...@apache.org on 2016/03/05 12:45:35 UTC

parquet-mr git commit: PARQUET-528: Fix flush() for RecordConsumer and implementations

Repository: parquet-mr
Updated Branches:
  refs/heads/master fb46b941f -> 1f91c79de


PARQUET-528: Fix flush() for RecordConsumer and implementations

`flush()` was added in `RecordConsumer` and `MessageColumnIO` to help implementing nulls caching.

However, other `RecordConsumer` implementations should also implements `flush()` properly. For instance, `RecordConsumerLoggingWrapper` and `ValidatingRecordConsumer` should call `delegate.flush()` in their `flush()` methods, otherwise data might be mistakenly truncated.

This PR:
- makes `flush()` abstract in `RecordConsumer`
- implements `flush()` properly for all `RecordConsumer` subclasses, specifically:
 - `RecordConsumerLoggingWrapper`
 - `ValidatingRecordConsumer`
 - `ConverterConsumer `
 - `ExpectationValidatingRecordConsumer `

Author: proflin <pr...@gmail.com>
Author: Liwei Lin <pr...@gmail.com>

Closes #325 from proflin/PARQUET-528 and squashes the following commits:

2c90740 [proflin] Minor style issue
25444b9 [proflin] Still keep RecordConsumer.flush() non-abstract
8776e3a [proflin] PARQUET-528: Fix flush() for RecordConsumer and implementations
bb4283a [Liwei Lin] Merge branch 'master' of https://github.com/proflin/parquet-mr
839b458 [proflin] Merge remote-tracking branch 'refs/remotes/apache/master'


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/1f91c79d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/1f91c79d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/1f91c79d

Branch: refs/heads/master
Commit: 1f91c79de5e2d852c6e7d0cf7a4255087ef618ef
Parents: fb46b94
Author: proflin <pr...@gmail.com>
Authored: Sat Mar 5 19:45:25 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Sat Mar 5 19:45:25 2016 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/parquet/io/MessageColumnIO.java    | 4 +++-
 .../org/apache/parquet/io/RecordConsumerLoggingWrapper.java | 9 +++++++++
 .../org/apache/parquet/io/ValidatingRecordConsumer.java     | 4 ++++
 .../main/java/org/apache/parquet/io/api/RecordConsumer.java | 1 +
 .../test/java/org/apache/parquet/io/ConverterConsumer.java  | 8 ++++++++
 .../parquet/io/ExpectationValidatingRecordConsumer.java     | 8 ++++++++
 6 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index cb1c8d6..f962105 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -487,7 +487,9 @@ public class MessageColumnIO extends GroupColumnIO {
     }
 
 
-    //should flush null for all groups
+    /**
+     * Flush null for all groups
+     */
     @Override
     public void flush() {
       flushCachedNulls(MessageColumnIO.this);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
index 642c1f4..7a8b1c1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
@@ -138,6 +138,15 @@ public class RecordConsumerLoggingWrapper extends RecordConsumer {
      * {@inheritDoc}
      */
     @Override
+    public void flush() {
+      if (DEBUG) log("<!-- flush -->");
+      delegate.flush();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public void endGroup() {
       if (DEBUG) log("<!-- end group -->");
       if (DEBUG) --indent;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
index bf4c196..46f0aae 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
@@ -126,6 +126,10 @@ public class ValidatingRecordConsumer extends RecordConsumer {
     types.pop();
     previousField.pop();
   }
+
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void flush(){
     delegate.flush();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
index e11d763..e1ab60c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
@@ -129,6 +129,7 @@ abstract public class RecordConsumer {
    * NoOps by default
    * Subclass class can implement its own flushing logic
    */
+  //TODO: make this abstract in 2.0
   public void flush() {
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java b/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
index 635657b..85e894f 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
@@ -114,4 +114,12 @@ public class ConverterConsumer extends RecordConsumer {
     currentPrimitive.addDouble(value);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flush() {
+    // do nothing
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
index 36538ea..30f4925 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
@@ -100,5 +100,13 @@ final public class ExpectationValidatingRecordConsumer extends
     validate("addDouble("+value+")");
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flush() {
+    validate("flush()");
+  }
+
 }