You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/07/05 13:21:00 UTC

flink git commit: [hotfix][filesystem] Remove incorrect equals methods in StreamWriters

Repository: flink
Updated Branches:
  refs/heads/master a9b5579b3 -> e236680f1


[hotfix][filesystem] Remove incorrect equals methods in StreamWriters

This closes #6262.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e236680f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e236680f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e236680f

Branch: refs/heads/master
Commit: e236680f14ebbaf97243dc15718aad93b68b584a
Parents: a9b5579
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Thu Jul 5 11:28:35 2018 +0200
Committer: Piotr Nowojski <pi...@gmail.com>
Committed: Thu Jul 5 15:19:41 2018 +0200

----------------------------------------------------------------------
 .../connectors/fs/AvroKeyValueSinkWriter.java   | 25 +-------
 .../connectors/fs/SequenceFileWriter.java       | 36 +++++-------
 .../connectors/fs/StreamWriterBase.java         | 22 +------
 .../streaming/connectors/fs/StringWriter.java   | 25 +-------
 .../fs/AvroKeyValueSinkWriterTest.java          |  8 +--
 .../connectors/fs/RollingSinkITCase.java        |  2 +-
 .../connectors/fs/SequenceFileWriterTest.java   |  8 +--
 .../fs/StreamWriterBaseComparator.java          | 60 ++++++++++++++++++++
 .../connectors/fs/StringWriterTest.java         |  8 +--
 .../fs/bucketing/BucketingSinkTest.java         |  3 +-
 10 files changed, 93 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 6b2f7d6..0f73e8c 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -40,7 +40,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Map;
-import java.util.Objects;
 
 /**
 * Implementation of AvroKeyValue writer that can be used in Sink.
@@ -204,7 +203,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 	}
 
 	@Override
-	public Writer<Tuple2<K, V>> duplicate() {
+	public AvroKeyValueSinkWriter<K, V> duplicate() {
 		return new AvroKeyValueSinkWriter<>(this);
 	}
 
@@ -335,25 +334,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
 		}
 	}
 
-	@Override
-	public int hashCode() {
-		return Objects.hash(super.hashCode(), properties);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (this == other) {
-			return true;
-		}
-		if (other == null) {
-			return false;
-		}
-		if (getClass() != other.getClass()) {
-			return false;
-		}
-		AvroKeyValueSinkWriter<K, V> writer = (AvroKeyValueSinkWriter<K, V>) other;
-		// field comparison
-		return Objects.equals(properties, writer.properties)
-			&& super.equals(other);
+	Map<String, String> getProperties() {
+		return properties;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 2f42ef7..17b16dd 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
@@ -152,32 +151,23 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends
 	}
 
 	@Override
-	public Writer<Tuple2<K, V>> duplicate() {
+	public SequenceFileWriter<K, V> duplicate() {
 		return new SequenceFileWriter<>(this);
 	}
 
-	@Override
-	public int hashCode() {
-		return Objects.hash(super.hashCode(), compressionCodecName, compressionType, keyClass, valueClass);
+	String getCompressionCodecName() {
+		return compressionCodecName;
 	}
 
-	@Override
-	public boolean equals(Object other) {
-		if (this == other) {
-			return true;
-		}
-		if (other == null) {
-			return false;
-		}
-		if (getClass() != other.getClass()) {
-			return false;
-		}
-		SequenceFileWriter<K, V> writer = (SequenceFileWriter<K, V>) other;
-		// field comparison
-		return Objects.equals(compressionCodecName, writer.compressionCodecName)
-			&& Objects.equals(compressionType, writer.compressionType)
-			&& Objects.equals(keyClass, writer.keyClass)
-			&& Objects.equals(valueClass, writer.valueClass)
-			&& super.equals(other);
+	SequenceFile.CompressionType getCompressionType() {
+		return compressionType;
+	}
+
+	Class<K> getKeyClass() {
+		return keyClass;
+	}
+
+	Class<V> getValueClass() {
+		return valueClass;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index f625ef3..d3035a5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
@@ -102,24 +101,7 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
 		}
 	}
 
-	@Override
-	public int hashCode() {
-		return Boolean.hashCode(syncOnFlush);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (this == other) {
-			return true;
-		}
-		if (other == null) {
-			return false;
-		}
-		if (getClass() != other.getClass()) {
-			return false;
-		}
-		StreamWriterBase<T> writer = (StreamWriterBase<T>) other;
-		// field comparison
-		return Objects.equals(syncOnFlush, writer.syncOnFlush);
+	public boolean isSyncOnFlush() {
+		return syncOnFlush;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 5c81b15..122bc7f 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.Objects;
 
 /**
  * A {@link Writer} that uses {@code toString()} on the input elements and writes them to
@@ -87,29 +86,11 @@ public class StringWriter<T> extends StreamWriterBase<T> {
 	}
 
 	@Override
-	public Writer<T> duplicate() {
+	public StringWriter<T> duplicate() {
 		return new StringWriter<>(this);
 	}
 
-	@Override
-	public int hashCode() {
-		return Objects.hash(super.hashCode(), charsetName);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (this == other) {
-			return true;
-		}
-		if (other == null) {
-			return false;
-		}
-		if (getClass() != other.getClass()) {
-			return false;
-		}
-		StringWriter<T> writer = (StringWriter<T>) other;
-		// field comparison
-		return Objects.equals(charsetName, writer.charsetName)
-			&& super.equals(other);
+	String getCharsetName() {
+		return charsetName;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
index 019e56d..864d9c1 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileConstants;
 import org.junit.Test;
@@ -47,11 +45,11 @@ public class AvroKeyValueSinkWriterTest {
 
 		AvroKeyValueSinkWriter<String, String> writer = new AvroKeyValueSinkWriter(properties);
 		writer.setSyncOnFlush(true);
-		Writer<Tuple2<String, String>> other = writer.duplicate();
+		AvroKeyValueSinkWriter<String, String> other = writer.duplicate();
 
-		assertTrue(writer.equals(other));
+		assertTrue(StreamWriterBaseComparator.equals(writer, other));
 
 		writer.setSyncOnFlush(false);
-		assertFalse(writer.equals(other));
+		assertFalse(StreamWriterBaseComparator.equals(writer, other));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 93f6d52..86821a5 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -930,7 +930,7 @@ public class RollingSinkITCase extends TestLogger {
 		}
 
 		@Override
-		public Writer<T> duplicate() {
+		public StreamWriterWithConfigCheck<T> duplicate() {
 			return new StreamWriterWithConfigCheck<>(key, expect);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
index 7ea2264..44716d3 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
@@ -36,11 +34,11 @@ public class SequenceFileWriterTest {
 	public void testDuplicate() {
 		SequenceFileWriter<Text, Text> writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK);
 		writer.setSyncOnFlush(true);
-		Writer<Tuple2<Text, Text>> other = writer.duplicate();
+		SequenceFileWriter<Text, Text> other = writer.duplicate();
 
-		assertTrue(writer.equals(other));
+		assertTrue(StreamWriterBaseComparator.equals(writer, other));
 
 		writer.setSyncOnFlush(false);
-		assertFalse(writer.equals(other));
+		assertFalse(StreamWriterBaseComparator.equals(writer, other));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
new file mode 100644
index 0000000..9472c29
--- /dev/null
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.io.Writable;
+
+import java.util.Objects;
+
+/**
+ * Helper class to perform partial comparisons of {@link StreamWriterBase} instances. During comparisons
+ * it ignores changes in underlying output streams.
+ */
+public class StreamWriterBaseComparator {
+
+	public static <T> boolean equals(
+			StreamWriterBase<T> writer1,
+			StreamWriterBase<T> writer2) {
+		return Objects.equals(writer1.isSyncOnFlush(), writer2.isSyncOnFlush());
+	}
+
+	public static <K, V> boolean equals(
+			AvroKeyValueSinkWriter<K, V> writer1,
+			AvroKeyValueSinkWriter<K, V> writer2) {
+		return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
+			Objects.equals(writer1.getProperties(), writer2.getProperties());
+	}
+
+	public static <K extends Writable, V extends Writable> boolean equals(
+			SequenceFileWriter<K, V> writer1,
+			SequenceFileWriter<K, V> writer2) {
+		return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
+			Objects.equals(writer1.getCompressionCodecName(), writer2.getCompressionCodecName()) &&
+			Objects.equals(writer1.getCompressionType(), writer2.getCompressionType()) &&
+			Objects.equals(writer1.getKeyClass(), writer2.getKeyClass()) &&
+			Objects.equals(writer1.getValueClass(), writer2.getValueClass());
+	}
+
+	public static <T> boolean equals(
+		StringWriter<T> writer1,
+		StringWriter<T> writer2) {
+		return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
+			Objects.equals(writer1.getCharsetName(), writer2.getCharsetName());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 488f860..7009d94 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -34,12 +34,12 @@ public class StringWriterTest {
 	public void testDuplicate() {
 		StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
 		writer.setSyncOnFlush(true);
-		Writer<String> other = writer.duplicate();
+		StringWriter<String> other = writer.duplicate();
 
-		assertTrue(writer.equals(other));
+		assertTrue(StreamWriterBaseComparator.equals(writer, other));
 
 		writer.setSyncOnFlush(false);
-		assertFalse(writer.equals(other));
-		assertFalse(writer.equals(new StringWriter<>()));
+		assertFalse(StreamWriterBaseComparator.equals(writer, other));
+		assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 362c078..dc84846 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
 import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -918,7 +917,7 @@ public class BucketingSinkTest extends TestLogger {
 		}
 
 		@Override
-		public Writer<Tuple2<K, V>> duplicate() {
+		public StreamWriterWithConfigCheck<K, V> duplicate() {
 			return new StreamWriterWithConfigCheck<>(properties, key, expect);
 		}
 	}