You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by li...@apache.org on 2016/03/05 12:45:35 UTC
parquet-mr git commit: PARQUET-528: Fix flush() for RecordConsumer
and implementations
Repository: parquet-mr
Updated Branches:
refs/heads/master fb46b941f -> 1f91c79de
PARQUET-528: Fix flush() for RecordConsumer and implementations
`flush()` was added in `RecordConsumer` and `MessageColumnIO` to help implementing nulls caching.
However, other `RecordConsumer` implementations should also implements `flush()` properly. For instance, `RecordConsumerLoggingWrapper` and `ValidatingRecordConsumer` should call `delegate.flush()` in their `flush()` methods, otherwise data might be mistakenly truncated.
This PR:
- makes `flush()` abstract in `RecordConsumer`
- implements `flush()` properly for all `RecordConsumer` subclasses, specifically:
- `RecordConsumerLoggingWrapper`
- `ValidatingRecordConsumer`
- `ConverterConsumer `
- `ExpectationValidatingRecordConsumer `
Author: proflin <pr...@gmail.com>
Author: Liwei Lin <pr...@gmail.com>
Closes #325 from proflin/PARQUET-528 and squashes the following commits:
2c90740 [proflin] Minor style issue
25444b9 [proflin] Still keep RecordConsumer.flush() non-abstract
8776e3a [proflin] PARQUET-528: Fix flush() for RecordConsumer and implementations
bb4283a [Liwei Lin] Merge branch 'master' of https://github.com/proflin/parquet-mr
839b458 [proflin] Merge remote-tracking branch 'refs/remotes/apache/master'
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/1f91c79d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/1f91c79d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/1f91c79d
Branch: refs/heads/master
Commit: 1f91c79de5e2d852c6e7d0cf7a4255087ef618ef
Parents: fb46b94
Author: proflin <pr...@gmail.com>
Authored: Sat Mar 5 19:45:25 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Sat Mar 5 19:45:25 2016 +0800
----------------------------------------------------------------------
.../main/java/org/apache/parquet/io/MessageColumnIO.java | 4 +++-
.../org/apache/parquet/io/RecordConsumerLoggingWrapper.java | 9 +++++++++
.../org/apache/parquet/io/ValidatingRecordConsumer.java | 4 ++++
.../main/java/org/apache/parquet/io/api/RecordConsumer.java | 1 +
.../test/java/org/apache/parquet/io/ConverterConsumer.java | 8 ++++++++
.../parquet/io/ExpectationValidatingRecordConsumer.java | 8 ++++++++
6 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
index cb1c8d6..f962105 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -487,7 +487,9 @@ public class MessageColumnIO extends GroupColumnIO {
}
- //should flush null for all groups
+ /**
+ * Flush null for all groups
+ */
@Override
public void flush() {
flushCachedNulls(MessageColumnIO.this);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
index 642c1f4..7a8b1c1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
@@ -138,6 +138,15 @@ public class RecordConsumerLoggingWrapper extends RecordConsumer {
* {@inheritDoc}
*/
@Override
+ public void flush() {
+ if (DEBUG) log("<!-- flush -->");
+ delegate.flush();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public void endGroup() {
if (DEBUG) log("<!-- end group -->");
if (DEBUG) --indent;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
index bf4c196..46f0aae 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
@@ -126,6 +126,10 @@ public class ValidatingRecordConsumer extends RecordConsumer {
types.pop();
previousField.pop();
}
+
+ /**
+ * {@inheritDoc}
+ */
@Override
public void flush(){
delegate.flush();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
index e11d763..e1ab60c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
@@ -129,6 +129,7 @@ abstract public class RecordConsumer {
* NoOps by default
* Subclass class can implement its own flushing logic
*/
+ //TODO: make this abstract in 2.0
public void flush() {
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java b/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
index 635657b..85e894f 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ConverterConsumer.java
@@ -114,4 +114,12 @@ public class ConverterConsumer extends RecordConsumer {
currentPrimitive.addDouble(value);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush() {
+ // do nothing
+ }
+
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/1f91c79d/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
index 36538ea..30f4925 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/ExpectationValidatingRecordConsumer.java
@@ -100,5 +100,13 @@ final public class ExpectationValidatingRecordConsumer extends
validate("addDouble("+value+")");
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flush() {
+ validate("flush()");
+ }
+
}