You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/05/15 15:44:23 UTC

[iceberg] branch master updated: Flink: add toString, equals, hashCode overrides for RowDataProjection. (#7493)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 198fb723dd Flink: add toString, equals, hashCode overrides for RowDataProjection. (#7493)
198fb723dd is described below

commit 198fb723dd50c15e1129bcbcaaaa97832503a74d
Author: Steven Zhen Wu <st...@gmail.com>
AuthorDate: Mon May 15 08:44:17 2023 -0700

    Flink: add toString, equals, hashCode overrides for RowDataProjection. (#7493)
    
    * Flink: add toString, equals, hashCode overrides for RowDataProjection.
    
    Flink RowData implementation classes (like GenericRowData) all implement proper overrides for those methods. We also intend to use RowDataProjection as map key for MapDataStatistics from sink shuffling work. Hence this change is also required.
---
 .../iceberg/flink/data/RowDataProjection.java      |  82 +++-
 .../org/apache/iceberg/flink/DataGenerator.java    |   4 +-
 .../org/apache/iceberg/flink/DataGenerators.java   | 198 ++++++---
 .../iceberg/flink/data/TestRowDataProjection.java  | 488 ++++++++++++++++-----
 4 files changed, 580 insertions(+), 192 deletions(-)

diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
index 6d7b8f76e0..33816c97ac 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iceberg.flink.data;
 
+import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.MapData;
@@ -28,6 +30,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.StringUtils;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -155,11 +158,17 @@ public class RowDataProjection implements RowData {
   }
 
   public RowData wrap(RowData row) {
+    // StructProjection allow wrapping null root struct object.
+    // See more discussions in https://github.com/apache/iceberg/pull/7517.
+    // RowDataProjection never allowed null root object to be wrapped.
+    // Hence, it is fine to enforce strict Preconditions check here.
+    Preconditions.checkArgument(row != null, "Invalid row data: null");
     this.rowData = row;
     return this;
   }
 
   private Object getValue(int pos) {
+    Preconditions.checkState(rowData != null, "Row data not wrapped");
     return getters[pos].getFieldOrNull(rowData);
   }
 
@@ -170,6 +179,7 @@ public class RowDataProjection implements RowData {
 
   @Override
   public RowKind getRowKind() {
+    Preconditions.checkState(rowData != null, "Row data not wrapped");
     return rowData.getRowKind();
   }
 
@@ -180,7 +190,7 @@ public class RowDataProjection implements RowData {
 
   @Override
   public boolean isNullAt(int pos) {
-    return rowData == null || getValue(pos) == null;
+    return getValue(pos) == null;
   }
 
   @Override
@@ -258,4 +268,74 @@ public class RowDataProjection implements RowData {
   public RowData getRow(int pos, int numFields) {
     return (RowData) getValue(pos);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof RowDataProjection)) {
+      return false;
+    }
+
+    RowDataProjection that = (RowDataProjection) o;
+    return deepEquals(that);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hashCode(getRowKind());
+    for (int pos = 0; pos < getArity(); pos++) {
+      if (!isNullAt(pos)) {
+        // Arrays.deepHashCode handles array object properly
+        result = 31 * result + Arrays.deepHashCode(new Object[] {getValue(pos)});
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getRowKind().shortString()).append("(");
+    for (int pos = 0; pos < getArity(); pos++) {
+      if (pos != 0) {
+        sb.append(",");
+      }
+      // copied the behavior from Flink GenericRowData
+      sb.append(StringUtils.arrayAwareToString(getValue(pos)));
+    }
+
+    sb.append(")");
+    return sb.toString();
+  }
+
+  private boolean deepEquals(RowDataProjection other) {
+    if (getRowKind() != other.getRowKind()) {
+      return false;
+    }
+
+    if (getArity() != other.getArity()) {
+      return false;
+    }
+
+    for (int pos = 0; pos < getArity(); ++pos) {
+      if (isNullAt(pos) && other.isNullAt(pos)) {
+        continue;
+      }
+
+      if ((isNullAt(pos) && !other.isNullAt(pos)) || (!isNullAt(pos) && other.isNullAt(pos))) {
+        return false;
+      }
+
+      // Objects.deepEquals handles array object properly
+      if (!Objects.deepEquals(getValue(pos), other.getValue(pos))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerator.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerator.java
index 868157a6ef..b1e3b20ff7 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerator.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iceberg.flink;
 
-import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
@@ -36,7 +36,7 @@ public interface DataGenerator {
 
   GenericRecord generateIcebergGenericRecord();
 
-  RowData generateFlinkRowData();
+  GenericRowData generateFlinkRowData();
 
   org.apache.avro.generic.GenericRecord generateAvroGenericRecord();
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
index 1363c152e0..e2cd411d70 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
@@ -42,7 +42,6 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.RowType;
@@ -81,7 +80,7 @@ public class DataGenerators {
 
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             // primitive types
             Types.NestedField.optional(2, "boolean_field", Types.BooleanType.get()),
             Types.NestedField.optional(3, "int_field", Types.IntegerType.get()),
@@ -160,7 +159,7 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("boolean_field", false);
       genericRecord.setField("int_field", Integer.MAX_VALUE);
       genericRecord.setField("long_field", Long.MAX_VALUE);
@@ -193,7 +192,7 @@ public class DataGenerators {
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       byte[] uuidBytes = new byte[16];
       for (int i = 0; i < 16; ++i) {
         uuidBytes[i] = (byte) i;
@@ -205,7 +204,7 @@ public class DataGenerators {
       }
 
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           false,
           Integer.MAX_VALUE,
           Long.MAX_VALUE,
@@ -227,7 +226,7 @@ public class DataGenerators {
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", new Utf8("partition_value"));
+      genericRecord.put("row_id", new Utf8("row_id_value"));
       genericRecord.put("boolean_field", false);
       genericRecord.put("int_field", Integer.MAX_VALUE);
       genericRecord.put("long_field", Long.MAX_VALUE);
@@ -246,7 +245,6 @@ public class DataGenerators {
       for (int i = 0; i < 16; ++i) {
         uuidBytes[i] = (byte) i;
       }
-      org.apache.avro.Schema uuidFieldSchema = avroSchema.getField("uuid_field").schema();
       genericRecord.put("uuid_field", ByteBuffer.wrap(uuidBytes));
 
       byte[] binaryBytes = new byte[7];
@@ -269,7 +267,7 @@ public class DataGenerators {
   public static class StructOfPrimitive implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "struct_of_primitive",
@@ -305,15 +303,15 @@ public class DataGenerators {
       struct.setField("id", 1);
       struct.setField("name", "Jane");
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("struct_of_primitive", struct);
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           GenericRowData.of(1, StringData.fromString("Jane")));
     }
 
@@ -324,7 +322,7 @@ public class DataGenerators {
       struct.put("id", 1);
       struct.put("name", "Jane");
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("struct_of_primitive", struct);
       return genericRecord;
     }
@@ -333,7 +331,7 @@ public class DataGenerators {
   public static class StructOfArray implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "struct_of_array",
@@ -370,17 +368,16 @@ public class DataGenerators {
       struct.setField("id", 1);
       struct.setField("names", Arrays.asList("Jane", "Joe"));
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("struct_of_array", struct);
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       StringData[] names = {StringData.fromString("Jane"), StringData.fromString("Joe")};
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
-          GenericRowData.of(1, new GenericArrayData(names)));
+          StringData.fromString("row_id_value"), GenericRowData.of(1, new GenericArrayData(names)));
     }
 
     @Override
@@ -390,7 +387,7 @@ public class DataGenerators {
       struct.put("id", 1);
       struct.put("names", Arrays.asList("Jane", "Joe"));
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("struct_of_array", struct);
       return genericRecord;
     }
@@ -399,7 +396,7 @@ public class DataGenerators {
   public static class StructOfMap implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "struct_of_map",
@@ -439,15 +436,15 @@ public class DataGenerators {
       struct.setField("id", 1);
       struct.setField("names", ImmutableMap.of("Jane", "female", "Joe", "male"));
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("struct_of_map", struct);
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           GenericRowData.of(
               1,
               new GenericMapData(
@@ -465,7 +462,7 @@ public class DataGenerators {
       struct.put("id", 1);
       struct.put("names", ImmutableMap.of("Jane", new Utf8("female"), "Joe", new Utf8("male")));
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("struct_of_map", struct);
       return genericRecord;
     }
@@ -474,7 +471,7 @@ public class DataGenerators {
   public static class StructOfStruct implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "struct_of_struct",
@@ -520,15 +517,15 @@ public class DataGenerators {
       struct.setField("id", 1);
       struct.setField("person_struct", person);
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("struct_of_struct", struct);
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           GenericRowData.of(
               1,
               GenericRowData.of(
@@ -546,7 +543,7 @@ public class DataGenerators {
       struct.put("id", 1);
       struct.put("person_struct", person);
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("struct_of_struct", struct);
       return genericRecord;
     }
@@ -555,9 +552,9 @@ public class DataGenerators {
   public static class ArrayOfPrimitive implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
-                2, "array_of_int", Types.ListType.ofRequired(101, Types.IntegerType.get())));
+                2, "array_of_int", Types.ListType.ofOptional(101, Types.IntegerType.get())));
 
     private final RowType flinkRowType = FlinkSchemaUtil.convert(icebergSchema);
 
@@ -582,21 +579,21 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("array_of_int", Arrays.asList(1, 2, 3));
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       Integer[] arr = {1, 2, 3};
-      return GenericRowData.of(StringData.fromString("partition_value"), new GenericArrayData(arr));
+      return GenericRowData.of(StringData.fromString("row_id_value"), new GenericArrayData(arr));
     }
 
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("array_of_int", Arrays.asList(1, 2, 3));
       return genericRecord;
     }
@@ -605,7 +602,7 @@ public class DataGenerators {
   public static class ArrayOfArray implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "array_of_array",
@@ -635,14 +632,14 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField(
           "array_of_array", Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)));
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       // non-primitive
       Integer[] array1 = {1, 2, 3};
       Integer[] array2 = {4, 5, 6};
@@ -650,13 +647,13 @@ public class DataGenerators {
         new GenericArrayData(array1), new GenericArrayData(array2)
       };
       return GenericRowData.of(
-          StringData.fromString("partition_value"), new GenericArrayData(arrayOfArrays));
+          StringData.fromString("row_id_value"), new GenericArrayData(arrayOfArrays));
     }
 
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put(
           "array_of_array", Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)));
       return genericRecord;
@@ -666,7 +663,7 @@ public class DataGenerators {
   public static class ArrayOfMap implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "array_of_map",
@@ -698,7 +695,7 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField(
           "array_of_map",
           Arrays.asList(
@@ -707,21 +704,20 @@ public class DataGenerators {
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       GenericMapData[] array = {
         new GenericMapData(
             ImmutableMap.of(StringData.fromString("Jane"), 1, StringData.fromString("Joe"), 2)),
         new GenericMapData(
             ImmutableMap.of(StringData.fromString("Alice"), 3, StringData.fromString("Bob"), 4))
       };
-      return GenericRowData.of(
-          StringData.fromString("partition_value"), new GenericArrayData(array));
+      return GenericRowData.of(StringData.fromString("row_id_value"), new GenericArrayData(array));
     }
 
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put(
           "array_of_map",
           Arrays.asList(
@@ -741,8 +737,8 @@ public class DataGenerators {
 
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
-            Types.NestedField.required(
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
+            Types.NestedField.optional(
                 2, "array_of_struct", Types.ListType.ofRequired(101, structType)));
 
     private final RowType flinkRowType = FlinkSchemaUtil.convert(icebergSchema);
@@ -774,19 +770,19 @@ public class DataGenerators {
       struct2.setField("id", 2);
       struct2.setField("name", "Joe");
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("array_of_struct", Arrays.asList(struct1, struct2));
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       GenericRowData[] structArray = {
         GenericRowData.of(1, StringData.fromString("Jane")),
         GenericRowData.of(2, StringData.fromString("Joe"))
       };
       return GenericRowData.of(
-          StringData.fromString("partition_value"), new GenericArrayData(structArray));
+          StringData.fromString("row_id_value"), new GenericArrayData(structArray));
     }
 
     @Override
@@ -798,7 +794,7 @@ public class DataGenerators {
       struct2.put("id", 2);
       struct2.put("name", "Joe");
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("array_of_struct", Arrays.asList(struct1, struct2));
       return genericRecord;
     }
@@ -807,8 +803,8 @@ public class DataGenerators {
   public static class MapOfPrimitives implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
-            Types.NestedField.required(
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
+            Types.NestedField.optional(
                 2,
                 "map_of_primitives",
                 Types.MapType.ofRequired(
@@ -837,15 +833,15 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("map_of_primitives", ImmutableMap.of("Jane", 1, "Joe", 2));
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           new GenericMapData(
               ImmutableMap.of(StringData.fromString("Jane"), 1, StringData.fromString("Joe"), 2)));
     }
@@ -853,7 +849,7 @@ public class DataGenerators {
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put("map_of_primitives", ImmutableMap.of("Jane", 1, "Joe", 2));
       return genericRecord;
     }
@@ -862,7 +858,7 @@ public class DataGenerators {
   public static class MapOfArray implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "map_of_array",
@@ -895,7 +891,7 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField(
           "map_of_array",
           ImmutableMap.of(
@@ -905,11 +901,11 @@ public class DataGenerators {
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       Integer[] janeArray = {1, 2, 3};
       Integer[] joeArray = {4, 5, 6};
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           new GenericMapData(
               ImmutableMap.of(
                   StringData.fromString("Jane"),
@@ -921,7 +917,7 @@ public class DataGenerators {
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put(
           "map_of_array",
           ImmutableMap.of(
@@ -934,7 +930,7 @@ public class DataGenerators {
   public static class MapOfMap implements DataGenerator {
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "map_of_map",
@@ -968,7 +964,7 @@ public class DataGenerators {
     @Override
     public GenericRecord generateIcebergGenericRecord() {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField(
           "map_of_map",
           ImmutableMap.of(
@@ -978,9 +974,9 @@ public class DataGenerators {
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           new GenericMapData(
               ImmutableMap.of(
                   StringData.fromString("female"),
@@ -996,7 +992,7 @@ public class DataGenerators {
     @Override
     public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", "partition_value");
+      genericRecord.put("row_id", "row_id_value");
       genericRecord.put(
           "map_of_map",
           ImmutableMap.of(
@@ -1041,7 +1037,7 @@ public class DataGenerators {
 
     private final Schema icebergSchema =
         new Schema(
-            Types.NestedField.required(1, "partition_field", Types.StringType.get()),
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
                 2,
                 "map_of_struct",
@@ -1055,7 +1051,7 @@ public class DataGenerators {
         SchemaBuilder.builder()
             .record("table")
             .fields()
-            .requiredString("partition_field")
+            .requiredString("row_id")
             .name("map_of_struct")
             .type(SchemaBuilder.builder().map().values(structAvroSchema))
             .noDefault()
@@ -1085,16 +1081,16 @@ public class DataGenerators {
       struct2.setField("id", 2);
       struct2.setField("name", "Joe");
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
-      genericRecord.setField("partition_field", "partition_value");
+      genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField(
           "map_of_struct", ImmutableMap.of("struct1", struct1, "struct2", struct2));
       return genericRecord;
     }
 
     @Override
-    public RowData generateFlinkRowData() {
+    public GenericRowData generateFlinkRowData() {
       return GenericRowData.of(
-          StringData.fromString("partition_value"),
+          StringData.fromString("row_id_value"),
           new GenericMapData(
               ImmutableMap.of(
                   StringData.fromString("struct1"),
@@ -1112,9 +1108,65 @@ public class DataGenerators {
       struct2.put("id", 2);
       struct2.put("name", new Utf8("Joe"));
       org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema);
-      genericRecord.put("partition_field", new Utf8("partition_value"));
+      genericRecord.put("row_id", new Utf8("row_id_value"));
       genericRecord.put("map_of_struct", ImmutableMap.of("struct1", struct1, "struct2", struct2));
       return genericRecord;
     }
   }
+
+  public static class MapOfStructStruct implements DataGenerator {
+    private final Schema icebergSchema =
+        new Schema(
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
+            Types.NestedField.optional(
+                2,
+                "map",
+                Types.MapType.ofOptional(
+                    101,
+                    102,
+                    Types.StructType.of(
+                        Types.NestedField.required(201, "key", Types.LongType.get()),
+                        Types.NestedField.optional(202, "keyData", Types.StringType.get())),
+                    Types.StructType.of(
+                        Types.NestedField.required(203, "value", Types.LongType.get()),
+                        Types.NestedField.optional(204, "valueData", Types.StringType.get())))));
+
+    private final RowType flinkRowType = FlinkSchemaUtil.convert(icebergSchema);
+
+    @Override
+    public Schema icebergSchema() {
+      return icebergSchema;
+    }
+
+    @Override
+    public RowType flinkRowType() {
+      return flinkRowType;
+    }
+
+    @Override
+    public org.apache.avro.Schema avroSchema() {
+      throw new UnsupportedOperationException(
+          "Not applicable as Avro Map only support string key type");
+    }
+
+    @Override
+    public GenericRecord generateIcebergGenericRecord() {
+      throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public GenericRowData generateFlinkRowData() {
+      return GenericRowData.of(
+          StringData.fromString("row_id_value"),
+          new GenericMapData(
+              ImmutableMap.of(
+                  GenericRowData.of(1L, StringData.fromString("key_data")),
+                  GenericRowData.of(1L, StringData.fromString("value_data")))));
+    }
+
+    @Override
+    public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() {
+      throw new UnsupportedOperationException("Avro Map only support string key type");
+    }
+  }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
index a3bbb9ae25..3cd25c8fa9 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
@@ -18,20 +18,40 @@
  */
 package org.apache.iceberg.flink.data;
 
-import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.DataGenerator;
+import org.apache.iceberg.flink.DataGenerators;
 import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructProjection;
 import org.assertj.core.api.Assertions;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 public class TestRowDataProjection {
+  @Test
+  public void testNullRootRowData() {
+    Schema schema =
+        new Schema(
+            Types.NestedField.required(0, "id", Types.LongType.get()),
+            Types.NestedField.optional(1, "data", Types.StringType.get()));
+
+    RowDataProjection projection = RowDataProjection.create(schema, schema.select("id"));
+
+    Assertions.assertThatThrownBy(() -> projection.wrap(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid row data: null");
+  }
 
   @Test
   public void testFullProjection() {
@@ -41,6 +61,11 @@ public class TestRowDataProjection {
             Types.NestedField.optional(1, "data", Types.StringType.get()));
 
     generateAndValidate(schema, schema);
+
+    GenericRowData rowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData copyRowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData otherRowData = GenericRowData.of(2L, StringData.fromString("b"));
+    testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
   }
 
   @Test
@@ -56,6 +81,11 @@ public class TestRowDataProjection {
             Types.NestedField.required(0, "id", Types.LongType.get()));
 
     generateAndValidate(schema, reordered);
+
+    GenericRowData rowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData copyRowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData otherRowData = GenericRowData.of(2L, StringData.fromString("b"));
+    testEqualsAndHashCode(schema, reordered, rowData, copyRowData, otherRowData);
   }
 
   @Test
@@ -64,10 +94,16 @@ public class TestRowDataProjection {
         new Schema(
             Types.NestedField.required(0, "id", Types.LongType.get()),
             Types.NestedField.optional(1, "data", Types.StringType.get()));
-    Schema id = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));
-    Schema data = new Schema(Types.NestedField.optional(1, "data", Types.StringType.get()));
-    generateAndValidate(schema, id);
-    generateAndValidate(schema, data);
+    Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));
+    Schema dataOnly = new Schema(Types.NestedField.optional(1, "data", Types.StringType.get()));
+    generateAndValidate(schema, idOnly);
+    generateAndValidate(schema, dataOnly);
+
+    GenericRowData rowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData copyRowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData otherRowData = GenericRowData.of(2L, StringData.fromString("b"));
+    testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(schema, dataOnly, rowData, copyRowData, otherRowData);
   }
 
   @Test
@@ -77,6 +113,11 @@ public class TestRowDataProjection {
             Types.NestedField.required(0, "id", Types.LongType.get()),
             Types.NestedField.optional(1, "data", Types.StringType.get()));
     generateAndValidate(schema, schema.select());
+
+    GenericRowData rowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData copyRowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData otherRowData = GenericRowData.of(2L, StringData.fromString("b"));
+    testEqualsAndHashCode(schema, schema.select(), rowData, copyRowData, otherRowData, true);
   }
 
   @Test
@@ -91,6 +132,11 @@ public class TestRowDataProjection {
             Types.NestedField.required(0, "id", Types.LongType.get()),
             Types.NestedField.optional(1, "renamed", Types.StringType.get()));
     generateAndValidate(schema, renamed);
+
+    GenericRowData rowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData copyRowData = GenericRowData.of(1L, StringData.fromString("a"));
+    GenericRowData otherRowData = GenericRowData.of(2L, StringData.fromString("b"));
+    testEqualsAndHashCode(schema, renamed, rowData, copyRowData, otherRowData);
   }
 
   @Test
@@ -105,9 +151,21 @@ public class TestRowDataProjection {
                     Types.NestedField.required(1, "lat", Types.FloatType.get()),
                     Types.NestedField.required(2, "long", Types.FloatType.get()))));
 
+    GenericRowData rowData = GenericRowData.of(1L, GenericRowData.of(1.0f, 1.0f));
+    GenericRowData copyRowData = GenericRowData.of(1L, GenericRowData.of(1.0f, 1.0f));
+    GenericRowData otherRowData = GenericRowData.of(2L, GenericRowData.of(2.0f, 2.0f));
+
+    GenericRowData rowDataNullStruct = GenericRowData.of(1L, null);
+    GenericRowData copyRowDataNullStruct = GenericRowData.of(1L, null);
+    GenericRowData otherRowDataNullStruct = GenericRowData.of(2L, null);
+
     // Project id only.
     Schema idOnly = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));
+    Assertions.assertThat(idOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, idOnly);
+    testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(
+        schema, idOnly, rowDataNullStruct, copyRowDataNullStruct, otherRowDataNullStruct);
 
     // Project lat only.
     Schema latOnly =
@@ -116,7 +174,11 @@ public class TestRowDataProjection {
                 3,
                 "location",
                 Types.StructType.of(Types.NestedField.required(1, "lat", Types.FloatType.get()))));
+    Assertions.assertThat(latOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, latOnly);
+    testEqualsAndHashCode(schema, latOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(
+        schema, latOnly, rowDataNullStruct, copyRowDataNullStruct, otherRowDataNullStruct, true);
 
     // Project long only.
     Schema longOnly =
@@ -125,86 +187,133 @@ public class TestRowDataProjection {
                 3,
                 "location",
                 Types.StructType.of(Types.NestedField.required(2, "long", Types.FloatType.get()))));
+    Assertions.assertThat(longOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, longOnly);
+    testEqualsAndHashCode(schema, longOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(
+        schema, longOnly, rowDataNullStruct, copyRowDataNullStruct, otherRowDataNullStruct, true);
 
     // Project location.
     Schema locationOnly = schema.select("location");
+    Assertions.assertThat(locationOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, locationOnly);
+    testEqualsAndHashCode(schema, locationOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(
+        schema,
+        locationOnly,
+        rowDataNullStruct,
+        copyRowDataNullStruct,
+        otherRowDataNullStruct,
+        true);
   }
 
   @Test
-  public void testPrimitiveTypeProjection() {
-    Schema schema =
-        new Schema(
-            Types.NestedField.required(0, "id", Types.LongType.get()),
-            Types.NestedField.optional(1, "data", Types.StringType.get()),
-            Types.NestedField.required(2, "b", Types.BooleanType.get()),
-            Types.NestedField.optional(3, "i", Types.IntegerType.get()),
-            Types.NestedField.required(4, "l", Types.LongType.get()),
-            Types.NestedField.optional(5, "f", Types.FloatType.get()),
-            Types.NestedField.required(6, "d", Types.DoubleType.get()),
-            Types.NestedField.optional(7, "date", Types.DateType.get()),
-            Types.NestedField.optional(8, "time", Types.TimeType.get()),
-            Types.NestedField.required(9, "ts", Types.TimestampType.withoutZone()),
-            Types.NestedField.required(10, "ts_tz", Types.TimestampType.withZone()),
-            Types.NestedField.required(11, "s", Types.StringType.get()),
-            Types.NestedField.required(12, "fixed", Types.FixedType.ofLength(7)),
-            Types.NestedField.optional(13, "bytes", Types.BinaryType.get()),
-            Types.NestedField.required(14, "dec_9_0", Types.DecimalType.of(9, 0)),
-            Types.NestedField.required(15, "dec_11_2", Types.DecimalType.of(11, 2)),
-            Types.NestedField.required(
-                16, "dec_38_10", Types.DecimalType.of(38, 10)) // maximum precision
-            );
-
+  public void testPrimitivesFullProjection() {
+    DataGenerator dataGenerator = new DataGenerators.Primitives();
+    Schema schema = dataGenerator.icebergSchema();
     generateAndValidate(schema, schema);
+
+    GenericRowData rowData = dataGenerator.generateFlinkRowData();
+    GenericRowData copyRowData = dataGenerator.generateFlinkRowData();
+    GenericRowData otherRowData = dataGenerator.generateFlinkRowData();
+    // modify the string field value (position 6)
+    otherRowData.setField(6, StringData.fromString("foo_bar"));
+    testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
+
+    GenericRowData rowDataNullOptionalFields = dataGenerator.generateFlinkRowData();
+    setOptionalFieldsNullForPrimitives(rowDataNullOptionalFields);
+    GenericRowData copyRowDataNullOptionalFields = dataGenerator.generateFlinkRowData();
+    setOptionalFieldsNullForPrimitives(copyRowDataNullOptionalFields);
+    GenericRowData otherRowDataNullOptionalFields = dataGenerator.generateFlinkRowData();
+    // modify the string field value (position 6)
+    otherRowDataNullOptionalFields.setField(6, StringData.fromString("foo_bar"));
+    setOptionalFieldsNullForPrimitives(otherRowData);
+    testEqualsAndHashCode(
+        schema,
+        schema,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
+  }
+
+  private void setOptionalFieldsNullForPrimitives(GenericRowData rowData) {
+    // fields from [1, 5] range are optional
+    for (int pos = 1; pos <= 5; ++pos) {
+      rowData.setField(pos, null);
+    }
   }
 
   @Test
-  public void testPrimitiveMapTypeProjection() {
-    Schema schema =
-        new Schema(
-            Types.NestedField.required(0, "id", Types.LongType.get()),
-            Types.NestedField.optional(
-                3,
-                "map",
-                Types.MapType.ofOptional(1, 2, Types.IntegerType.get(), Types.StringType.get())));
+  public void testMapOfPrimitivesProjection() {
+    DataGenerator dataGenerator = new DataGenerators.MapOfPrimitives();
+    Schema schema = dataGenerator.icebergSchema();
 
     // Project id only.
-    Schema idOnly = schema.select("id");
+    Schema idOnly = schema.select("row_id");
+    Assertions.assertThat(idOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, idOnly);
 
     // Project map only.
-    Schema mapOnly = schema.select("map");
+    Schema mapOnly = schema.select("map_of_primitives");
+    Assertions.assertThat(mapOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, mapOnly);
 
     // Project all.
     generateAndValidate(schema, schema);
+
+    GenericRowData rowData = dataGenerator.generateFlinkRowData();
+    GenericRowData copyRowData = dataGenerator.generateFlinkRowData();
+    // modify the map field value
+    GenericRowData otherRowData =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericMapData(
+                ImmutableMap.of(StringData.fromString("foo"), 1, StringData.fromString("bar"), 2)));
+    testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData, true);
+    testEqualsAndHashCode(schema, mapOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
+
+    GenericRowData rowDataNullOptionalFields =
+        GenericRowData.of(StringData.fromString("row_id_value"), null);
+    GenericRowData copyRowDataNullOptionalFields =
+        GenericRowData.of(StringData.fromString("row_id_value"), null);
+    // modify the map field value
+    GenericRowData otherRowDataNullOptionalFields =
+        GenericRowData.of(StringData.fromString("other_row_id_value"), null);
+    testEqualsAndHashCode(
+        schema,
+        idOnly,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
+    testEqualsAndHashCode(
+        schema,
+        mapOnly,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields,
+        true);
+    testEqualsAndHashCode(
+        schema,
+        schema,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
   }
 
   @Test
-  public void testNestedMapTypeProjection() {
-    Schema schema =
-        new Schema(
-            Types.NestedField.required(0, "id", Types.LongType.get()),
-            Types.NestedField.optional(
-                7,
-                "map",
-                Types.MapType.ofOptional(
-                    5,
-                    6,
-                    Types.StructType.of(
-                        Types.NestedField.required(1, "key", Types.LongType.get()),
-                        Types.NestedField.required(2, "keyData", Types.LongType.get())),
-                    Types.StructType.of(
-                        Types.NestedField.required(3, "value", Types.LongType.get()),
-                        Types.NestedField.required(4, "valueData", Types.LongType.get())))));
+  public void testMapOfStructStructProjection() {
+    DataGenerator dataGenerator = new DataGenerators.MapOfStructStruct();
+    Schema schema = dataGenerator.icebergSchema();
 
     // Project id only.
-    Schema idOnly = schema.select("id");
+    Schema idOnly = schema.select("row_id");
+    Assertions.assertThat(idOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, idOnly);
 
     // Project map only.
     Schema mapOnly = schema.select("map");
+    Assertions.assertThat(mapOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, mapOnly);
 
     // Project all.
@@ -214,82 +323,164 @@ public class TestRowDataProjection {
     Schema partialMapKey =
         new Schema(
             Types.NestedField.optional(
-                7,
+                2,
                 "map",
                 Types.MapType.ofOptional(
-                    5,
-                    6,
-                    Types.StructType.of(Types.NestedField.required(1, "key", Types.LongType.get())),
+                    101,
+                    102,
                     Types.StructType.of(
-                        Types.NestedField.required(3, "value", Types.LongType.get()),
-                        Types.NestedField.required(4, "valueData", Types.LongType.get())))));
-
+                        Types.NestedField.required(201, "key", Types.LongType.get())),
+                    Types.StructType.of(
+                        Types.NestedField.required(203, "value", Types.LongType.get()),
+                        Types.NestedField.required(204, "valueData", Types.StringType.get())))));
     Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialMapKey))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessageStartingWith("Cannot project a partial map key or value struct.");
+        .hasMessageContaining("Cannot project a partial map key or value struct.");
 
     // Project partial map key.
     Schema partialMapValue =
         new Schema(
             Types.NestedField.optional(
-                7,
+                2,
                 "map",
                 Types.MapType.ofOptional(
-                    5,
-                    6,
+                    101,
+                    102,
                     Types.StructType.of(
-                        Types.NestedField.required(1, "key", Types.LongType.get()),
-                        Types.NestedField.required(2, "keyData", Types.LongType.get())),
+                        Types.NestedField.required(201, "key", Types.LongType.get()),
+                        Types.NestedField.required(202, "keyData", Types.StringType.get())),
                     Types.StructType.of(
-                        Types.NestedField.required(3, "value", Types.LongType.get())))));
-
+                        Types.NestedField.required(203, "value", Types.LongType.get())))));
     Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialMapValue))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessageStartingWith("Cannot project a partial map key or value struct.");
+        .hasMessageContaining("Cannot project a partial map key or value struct.");
+
+    GenericRowData rowData = dataGenerator.generateFlinkRowData();
+    GenericRowData copyRowData = dataGenerator.generateFlinkRowData();
+    // modify the map field value
+    GenericRowData otherRowData =
+        GenericRowData.of(
+            StringData.fromString("other_row_id_value"),
+            new GenericMapData(
+                ImmutableMap.of(
+                    GenericRowData.of(1L, StringData.fromString("other_key_data")),
+                    GenericRowData.of(1L, StringData.fromString("other_value_data")))));
+    testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(schema, mapOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
+
+    GenericRowData rowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericMapData(
+                ImmutableMap.of(GenericRowData.of(1L, null), GenericRowData.of(1L, null))));
+    GenericRowData copyRowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericMapData(
+                ImmutableMap.of(GenericRowData.of(1L, null), GenericRowData.of(1L, null))));
+    // modify the map field value
+    GenericRowData otherRowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("other_row_id_value"),
+            new GenericMapData(
+                ImmutableMap.of(GenericRowData.of(2L, null), GenericRowData.of(2L, null))));
+    testEqualsAndHashCode(
+        schema,
+        idOnly,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
+    testEqualsAndHashCode(
+        schema,
+        mapOnly,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
+    testEqualsAndHashCode(
+        schema,
+        schema,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
   }
 
   @Test
-  public void testPrimitiveListTypeProjection() {
-    Schema schema =
-        new Schema(
-            Types.NestedField.required(0, "id", Types.LongType.get()),
-            Types.NestedField.optional(
-                2, "list", Types.ListType.ofOptional(1, Types.StringType.get())));
+  public void testArrayOfPrimitiveProjection() {
+    DataGenerator dataGenerator = new DataGenerators.ArrayOfPrimitive();
+    Schema schema = dataGenerator.icebergSchema();
 
     // Project id only.
-    Schema idOnly = schema.select("id");
+    Schema idOnly = schema.select("row_id");
+    Assertions.assertThat(idOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, idOnly);
 
     // Project list only.
-    Schema mapOnly = schema.select("list");
-    generateAndValidate(schema, mapOnly);
+    Schema arrayOnly = schema.select("array_of_int");
+    Assertions.assertThat(arrayOnly.columns().size()).isGreaterThan(0);
+    generateAndValidate(schema, arrayOnly);
 
     // Project all.
     generateAndValidate(schema, schema);
+
+    GenericRowData rowData = dataGenerator.generateFlinkRowData();
+    GenericRowData copyRowData = dataGenerator.generateFlinkRowData();
+    // modify the map field value
+    GenericRowData otherRowData =
+        GenericRowData.of(
+            StringData.fromString("other_row_id_value"),
+            new GenericArrayData(new Integer[] {4, 5, 6}));
+    testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(schema, arrayOnly, rowData, copyRowData, otherRowData);
+    testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
+
+    GenericRowData rowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericArrayData(new Integer[] {1, null, 3}));
+    GenericRowData copyRowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericArrayData(new Integer[] {1, null, 3}));
+    // modify the map field value
+    GenericRowData otherRowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("other_row_id_value"),
+            new GenericArrayData(new Integer[] {4, null, 6}));
+    testEqualsAndHashCode(
+        schema,
+        idOnly,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
+    testEqualsAndHashCode(
+        schema,
+        arrayOnly,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
+    testEqualsAndHashCode(
+        schema,
+        schema,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
   }
 
   @Test
-  public void testNestedListTypeProjection() {
-    Schema schema =
-        new Schema(
-            Types.NestedField.required(0, "id", Types.LongType.get()),
-            Types.NestedField.optional(
-                5,
-                "list",
-                Types.ListType.ofOptional(
-                    4,
-                    Types.StructType.of(
-                        Types.NestedField.required(1, "nestedListField1", Types.LongType.get()),
-                        Types.NestedField.required(2, "nestedListField2", Types.LongType.get()),
-                        Types.NestedField.required(3, "nestedListField3", Types.LongType.get())))));
+  public void testArrayOfStructProjection() {
+    DataGenerator dataGenerator = new DataGenerators.ArrayOfStruct();
+    Schema schema = dataGenerator.icebergSchema();
 
     // Project id only.
-    Schema idOnly = schema.select("id");
+    Schema idOnly = schema.select("row_id");
+    Assertions.assertThat(idOnly.columns().size()).isGreaterThan(0);
     generateAndValidate(schema, idOnly);
 
     // Project list only.
-    Schema mapOnly = schema.select("list");
-    generateAndValidate(schema, mapOnly);
+    Schema arrayOnly = schema.select("array_of_struct");
+    Assertions.assertThat(arrayOnly.columns().size()).isGreaterThan(0);
+    generateAndValidate(schema, arrayOnly);
 
     // Project all.
     generateAndValidate(schema, schema);
@@ -298,40 +489,105 @@ public class TestRowDataProjection {
     Schema partialList =
         new Schema(
             Types.NestedField.optional(
-                5,
-                "list",
+                2,
+                "array_of_struct",
                 Types.ListType.ofOptional(
-                    4,
+                    101,
                     Types.StructType.of(
-                        Types.NestedField.required(2, "nestedListField2", Types.LongType.get())))));
+                        Types.NestedField.required(202, "name", Types.StringType.get())))));
 
     Assertions.assertThatThrownBy(() -> generateAndValidate(schema, partialList))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessageStartingWith("Cannot project a partial list element struct.");
+        .hasMessageContaining("Cannot project a partial list element struct.");
+
+    GenericRowData rowData = dataGenerator.generateFlinkRowData();
+    GenericRowData copyRowData = dataGenerator.generateFlinkRowData();
+    // modify the map field value
+    GenericRowData otherRowData =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"), new GenericArrayData(new Integer[] {4, 5, 6}));
+    testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
+
+    GenericRowData rowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericArrayData(new Integer[] {1, null, 3}));
+    GenericRowData copyRowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericArrayData(new Integer[] {1, null, 3}));
+    // modify the map field value
+    GenericRowData otherRowDataNullOptionalFields =
+        GenericRowData.of(
+            StringData.fromString("row_id_value"),
+            new GenericArrayData(new Integer[] {4, null, 6}));
+    testEqualsAndHashCode(
+        schema,
+        schema,
+        rowDataNullOptionalFields,
+        copyRowDataNullOptionalFields,
+        otherRowDataNullOptionalFields);
   }
 
   private void generateAndValidate(Schema schema, Schema projectSchema) {
     int numRecords = 100;
-    Iterable<Record> recordList = RandomGenericData.generate(schema, numRecords, 102L);
-    Iterable<RowData> rowDataList = RandomRowData.generate(schema, numRecords, 102L);
+    List<Record> recordList = RandomGenericData.generate(schema, numRecords, 102L);
+    List<RowData> rowDataList =
+        Lists.newArrayList(RandomRowData.generate(schema, numRecords, 102L).iterator());
+    Assertions.assertThat(rowDataList).hasSize(recordList.size());
 
     StructProjection structProjection = StructProjection.create(schema, projectSchema);
     RowDataProjection rowDataProjection = RowDataProjection.create(schema, projectSchema);
 
-    Iterator<Record> recordIter = recordList.iterator();
-    Iterator<RowData> rowDataIter = rowDataList.iterator();
-
     for (int i = 0; i < numRecords; i++) {
-      Assert.assertTrue("Should have more records", recordIter.hasNext());
-      Assert.assertTrue("Should have more RowData", rowDataIter.hasNext());
+      StructLike expected = structProjection.wrap(recordList.get(i));
+      RowData projected = rowDataProjection.wrap(rowDataList.get(i));
+      TestHelpers.assertRowData(projectSchema, expected, projected);
+
+      Assertions.assertThat(projected).isEqualTo(projected);
+      Assertions.assertThat(projected).hasSameHashCodeAs(projected);
+      // make sure toString doesn't throw NPE for null values
+      Assertions.assertThatNoException().isThrownBy(projected::toString);
+    }
+  }
 
-      StructLike expected = structProjection.wrap(recordIter.next());
-      RowData actual = rowDataProjection.wrap(rowDataIter.next());
+  private void testEqualsAndHashCode(
+      Schema schema,
+      Schema projectionSchema,
+      RowData rowData,
+      RowData copyRowData,
+      RowData otherRowData) {
+    testEqualsAndHashCode(schema, projectionSchema, rowData, copyRowData, otherRowData, false);
+  }
 
-      TestHelpers.assertRowData(projectSchema, expected, actual);
+  /**
+   * @param isOtherRowDataSameAsRowData sometimes projection on otherRowData can result in the same
+   *     RowData, e.g. due to empty projection or null struct
+   */
+  private void testEqualsAndHashCode(
+      Schema schema,
+      Schema projectionSchema,
+      RowData rowData,
+      RowData copyRowData,
+      RowData otherRowData,
+      boolean isOtherRowDataSameAsRowData) {
+    RowDataProjection projection = RowDataProjection.create(schema, projectionSchema);
+    RowDataProjection copyProjection = RowDataProjection.create(schema, projectionSchema);
+    RowDataProjection otherProjection = RowDataProjection.create(schema, projectionSchema);
+
+    Assertions.assertThat(projection.wrap(rowData)).isEqualTo(copyProjection.wrap(copyRowData));
+    Assertions.assertThat(projection.wrap(rowData))
+        .hasSameHashCodeAs(copyProjection.wrap(copyRowData));
+
+    if (isOtherRowDataSameAsRowData) {
+      Assertions.assertThat(projection.wrap(rowData)).isEqualTo(otherProjection.wrap(otherRowData));
+      Assertions.assertThat(projection.wrap(rowData))
+          .hasSameHashCodeAs(otherProjection.wrap(otherRowData));
+    } else {
+      Assertions.assertThat(projection.wrap(rowData))
+          .isNotEqualTo(otherProjection.wrap(otherRowData));
+      Assertions.assertThat(projection.wrap(rowData))
+          .doesNotHaveSameHashCodeAs(otherProjection.wrap(otherRowData));
     }
-
-    Assert.assertFalse("Shouldn't have more record", recordIter.hasNext());
-    Assert.assertFalse("Shouldn't have more RowData", rowDataIter.hasNext());
   }
 }