You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/07/05 13:21:00 UTC
flink git commit: [hotfix][filesystem] Remove incorrect equals
methods in StreamWriters
Repository: flink
Updated Branches:
refs/heads/master a9b5579b3 -> e236680f1
[hotfix][filesystem] Remove incorrect equals methods in StreamWriters
This closes #6262.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e236680f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e236680f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e236680f
Branch: refs/heads/master
Commit: e236680f14ebbaf97243dc15718aad93b68b584a
Parents: a9b5579
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 5 11:28:35 2018 +0200
Committer: Piotr Nowojski <pi...@gmail.com>
Committed: Thu Jul 5 15:19:41 2018 +0200
----------------------------------------------------------------------
.../connectors/fs/AvroKeyValueSinkWriter.java | 25 +-------
.../connectors/fs/SequenceFileWriter.java | 36 +++++-------
.../connectors/fs/StreamWriterBase.java | 22 +------
.../streaming/connectors/fs/StringWriter.java | 25 +-------
.../fs/AvroKeyValueSinkWriterTest.java | 8 +--
.../connectors/fs/RollingSinkITCase.java | 2 +-
.../connectors/fs/SequenceFileWriterTest.java | 8 +--
.../fs/StreamWriterBaseComparator.java | 60 ++++++++++++++++++++
.../connectors/fs/StringWriterTest.java | 8 +--
.../fs/bucketing/BucketingSinkTest.java | 3 +-
10 files changed, 93 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 6b2f7d6..0f73e8c 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -40,7 +40,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
-import java.util.Objects;
/**
* Implementation of AvroKeyValue writer that can be used in Sink.
@@ -204,7 +203,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
}
@Override
- public Writer<Tuple2<K, V>> duplicate() {
+ public AvroKeyValueSinkWriter<K, V> duplicate() {
return new AvroKeyValueSinkWriter<>(this);
}
@@ -335,25 +334,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
}
}
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), properties);
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null) {
- return false;
- }
- if (getClass() != other.getClass()) {
- return false;
- }
- AvroKeyValueSinkWriter<K, V> writer = (AvroKeyValueSinkWriter<K, V>) other;
- // field comparison
- return Objects.equals(properties, writer.properties)
- && super.equals(other);
+ Map<String, String> getProperties() {
+ return properties;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 2f42ef7..17b16dd 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.IOException;
-import java.util.Objects;
/**
* A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
@@ -152,32 +151,23 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
}
@Override
- public Writer<Tuple2<K, V>> duplicate() {
+ public SequenceFileWriter<K, V> duplicate() {
return new SequenceFileWriter<>(this);
}
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), compressionCodecName, compressionType, keyClass, valueClass);
+ String getCompressionCodecName() {
+ return compressionCodecName;
}
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null) {
- return false;
- }
- if (getClass() != other.getClass()) {
- return false;
- }
- SequenceFileWriter<K, V> writer = (SequenceFileWriter<K, V>) other;
- // field comparison
- return Objects.equals(compressionCodecName, writer.compressionCodecName)
- && Objects.equals(compressionType, writer.compressionType)
- && Objects.equals(keyClass, writer.keyClass)
- && Objects.equals(valueClass, writer.valueClass)
- && super.equals(other);
+ SequenceFile.CompressionType getCompressionType() {
+ return compressionType;
+ }
+
+ Class<K> getKeyClass() {
+ return keyClass;
+ }
+
+ Class<V> getValueClass() {
+ return valueClass;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index f625ef3..d3035a5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
-import java.util.Objects;
/**
* Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
@@ -102,24 +101,7 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
}
}
- @Override
- public int hashCode() {
- return Boolean.hashCode(syncOnFlush);
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null) {
- return false;
- }
- if (getClass() != other.getClass()) {
- return false;
- }
- StreamWriterBase<T> writer = (StreamWriterBase<T>) other;
- // field comparison
- return Objects.equals(syncOnFlush, writer.syncOnFlush);
+ public boolean isSyncOnFlush() {
+ return syncOnFlush;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 5c81b15..122bc7f 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
-import java.util.Objects;
/**
* A {@link Writer} that uses {@code toString()} on the input elements and writes them to
@@ -87,29 +86,11 @@ public class StringWriter<T> extends StreamWriterBase<T> {
}
@Override
- public Writer<T> duplicate() {
+ public StringWriter<T> duplicate() {
return new StringWriter<>(this);
}
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), charsetName);
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null) {
- return false;
- }
- if (getClass() != other.getClass()) {
- return false;
- }
- StringWriter<T> writer = (StringWriter<T>) other;
- // field comparison
- return Objects.equals(charsetName, writer.charsetName)
- && super.equals(other);
+ String getCharsetName() {
+ return charsetName;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
index 019e56d..864d9c1 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.connectors.fs;
-import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileConstants;
import org.junit.Test;
@@ -47,11 +45,11 @@ public class AvroKeyValueSinkWriterTest {
AvroKeyValueSinkWriter<String, String> writer = new AvroKeyValueSinkWriter(properties);
writer.setSyncOnFlush(true);
- Writer<Tuple2<String, String>> other = writer.duplicate();
+ AvroKeyValueSinkWriter<String, String> other = writer.duplicate();
- assertTrue(writer.equals(other));
+ assertTrue(StreamWriterBaseComparator.equals(writer, other));
writer.setSyncOnFlush(false);
- assertFalse(writer.equals(other));
+ assertFalse(StreamWriterBaseComparator.equals(writer, other));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 93f6d52..86821a5 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -930,7 +930,7 @@ public class RollingSinkITCase extends TestLogger {
}
@Override
- public Writer<T> duplicate() {
+ public StreamWriterWithConfigCheck<T> duplicate() {
return new StreamWriterWithConfigCheck<>(key, expect);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
index 7ea2264..44716d3 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.streaming.connectors.fs;
-import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;
@@ -36,11 +34,11 @@ public class SequenceFileWriterTest {
public void testDuplicate() {
SequenceFileWriter<Text, Text> writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK);
writer.setSyncOnFlush(true);
- Writer<Tuple2<Text, Text>> other = writer.duplicate();
+ SequenceFileWriter<Text, Text> other = writer.duplicate();
- assertTrue(writer.equals(other));
+ assertTrue(StreamWriterBaseComparator.equals(writer, other));
writer.setSyncOnFlush(false);
- assertFalse(writer.equals(other));
+ assertFalse(StreamWriterBaseComparator.equals(writer, other));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
new file mode 100644
index 0000000..9472c29
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.io.Writable;
+
+import java.util.Objects;
+
+/**
+ * Helper class to perform partial comparisons of {@link StreamWriterBase} instances. During comparisons
+ * it ignores changes in underlying output streams.
+ */
+public class StreamWriterBaseComparator {
+
+ public static <T> boolean equals(
+ StreamWriterBase<T> writer1,
+ StreamWriterBase<T> writer2) {
+ return Objects.equals(writer1.isSyncOnFlush(), writer2.isSyncOnFlush());
+ }
+
+ public static <K, V> boolean equals(
+ AvroKeyValueSinkWriter<K, V> writer1,
+ AvroKeyValueSinkWriter<K, V> writer2) {
+ return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
+ Objects.equals(writer1.getProperties(), writer2.getProperties());
+ }
+
+ public static <K extends Writable, V extends Writable> boolean equals(
+ SequenceFileWriter<K, V> writer1,
+ SequenceFileWriter<K, V> writer2) {
+ return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
+ Objects.equals(writer1.getCompressionCodecName(), writer2.getCompressionCodecName()) &&
+ Objects.equals(writer1.getCompressionType(), writer2.getCompressionType()) &&
+ Objects.equals(writer1.getKeyClass(), writer2.getKeyClass()) &&
+ Objects.equals(writer1.getValueClass(), writer2.getValueClass());
+ }
+
+ public static <T> boolean equals(
+ StringWriter<T> writer1,
+ StringWriter<T> writer2) {
+ return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
+ Objects.equals(writer1.getCharsetName(), writer2.getCharsetName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 488f860..7009d94 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -34,12 +34,12 @@ public class StringWriterTest {
public void testDuplicate() {
StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
writer.setSyncOnFlush(true);
- Writer<String> other = writer.duplicate();
+ StringWriter<String> other = writer.duplicate();
- assertTrue(writer.equals(other));
+ assertTrue(StreamWriterBaseComparator.equals(writer, other));
writer.setSyncOnFlush(false);
- assertFalse(writer.equals(other));
- assertFalse(writer.equals(new StringWriter<>()));
+ assertFalse(StreamWriterBaseComparator.equals(writer, other));
+ assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 362c078..dc84846 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
import org.apache.flink.streaming.connectors.fs.Clock;
import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -918,7 +917,7 @@ public class BucketingSinkTest extends TestLogger {
}
@Override
- public Writer<Tuple2<K, V>> duplicate() {
+ public StreamWriterWithConfigCheck<K, V> duplicate() {
return new StreamWriterWithConfigCheck<>(properties, key, expect);
}
}