You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/08 02:47:04 UTC

[GitHub] [flink] JingsongLi commented on a change in pull request #14572: [FLINK-20877][table-runtime-blink] Refactor BytesHashMap and BytesMultiMap to support window key

JingsongLi commented on a change in pull request #14572:
URL: https://github.com/apache/flink/pull/14572#discussion_r553706628



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
##########
@@ -197,6 +197,11 @@ public BinaryRowData mapFromPages(BinaryRowData reuse, AbstractPagedInputView he
         return reuse;
     }
 
+    @Override
+    public void skipRecordToRead(AbstractPagedInputView source) throws IOException {
+        source.skipBytes(source.readInt());

Review comment:
       Why not `checkSkipReadForFixLengthPart`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
##########
@@ -197,6 +197,11 @@ public BinaryRowData mapFromPages(BinaryRowData reuse, AbstractPagedInputView he
         return reuse;
     }
 
+    @Override
+    public void skipRecordToRead(AbstractPagedInputView source) throws IOException {

Review comment:
       A better name to remind `pages`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
##########
@@ -182,9 +176,8 @@ public void reset() {
      *     BinaryRowData form who has only one MemorySegment.
      * @return {@link LookupInfo}
      */
-    public LookupInfo<V> lookup(BinaryRowData key) {
-        // check the looking up key having only one memory segment
-        checkArgument(key.getSegments().length == 1);
+    public LookupInfo<K, V> lookup(K key) {
+        validateKey(key);

Review comment:
       That is legacy code, we don't need to validate it now.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/PagedTypeSerializer.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.table.data.binary.BinaryRowData;
+
+import java.io.IOException;
+
+/** A type serializer which provides paged serialize and deserialize methods. */
+@Internal
+public abstract class PagedTypeSerializer<T> extends TypeSerializer<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Serializes the given record to the given target paged output view. Some implementations may
+     * skip some bytes if current page does not have enough space left, .e.g {@link BinaryRowData}.
+     *
+     * @param record The record to serialize.
+     * @param target The output view to write the serialized data to.
+     * @return Returns the skipped number of bytes.
+     * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically
+     *     raised by the output view, which may have an underlying I/O channel to which it
+     *     delegates.
+     */
+    public abstract int serializeToPages(T record, AbstractPagedOutputView target)
+            throws IOException;
+
+    /**
+     * De-serializes a record from the given source paged input view. For consistency with serialize
+     * format, some implementations may need to skip some bytes of source before de-serializing,
+     * .e.g {@link BinaryRowData}. Typically, the content read from source should be copied out when
+     * de-serializing, and we are not expecting the underlying data from source is reused. If you
+     * have such requirement, see {@link #mapFromPages(AbstractPagedInputView)}.
+     *
+     * @param source The input view from which to read the data.
+     * @return The de-serialized element.
+     * @throws IOException Thrown, if the de-serialization encountered an I/O related error.
+     *     Typically raised by the input view, which may have an underlying I/O channel from which
+     *     it reads.
+     */
+    public abstract T deserializeFromPages(AbstractPagedInputView source) throws IOException;
+
+    /** Reuse version of {@link #deserializeFromPages(AbstractPagedInputView)}. */
+    public abstract T deserializeFromPages(T reuse, AbstractPagedInputView source)
+            throws IOException;
+
+    /**
+     * Map a record from the given source paged input view. This method provides a possibility to
+     * achieve zero copy when de-serializing. You can either choose copy or not copy the content
+     * read from source, but we encourage to make it zero copy.
+     *
+     * <p>If you choose the zero copy way, you have to deal with the lifecycle of the pages
+     * properly.
+     *
+     * @param source The input view from which to read the data.
+     * @return The mapped element.
+     * @throws IOException Thrown, if the de-serialization encountered an I/O related error.
+     *     Typically raised by the input view, which may have an underlying I/O channel from which
+     *     it reads.
+     */
+    public abstract T mapFromPages(AbstractPagedInputView source) throws IOException;

Review comment:
       Can you remove this method? It is useless but misleading.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesWindowHashMap.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util.collections.binary;
+
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.typeutils.WindowKeySerializer;
+import org.apache.flink.table.runtime.util.WindowKey;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A binary map in the structure like {@code Map<WindowKey, BinaryRowData>}.
+ *
+ * @see BytesHashMapBase for more information about the binary layout.
+ */
+public final class BytesWindowHashMap extends BytesHashMapBase<WindowKey> {

Review comment:
       NIT: I prefer `WindowBytesHashMap`, corresponding to `AbstractBytesHashMap`

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapBase.java
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util.collections.binary;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Bytes based hash map. It can be used for performing aggregations where the aggregated values are
+ * fixed-width, because the data is stored in continuous memory, AggBuffer of variable length cannot
+ * be applied to this HashMap. The KeyValue form in hash map is designed to reduce the cost of key
+ * fetching in lookup. The memory is divided into two areas:
+ *
+ * <p>Bucket area: pointer + hashcode.
+ *
+ * <ul>
+ *   <li>Bytes 0 to 4: a pointer to the record in the record area
+ *   <li>Bytes 4 to 8: key's full 32-bit hashcode
+ * </ul>
+ *
+ * <p>Record area: the actual data in linked list records, a record has four parts:
+ *
+ * <ul>
+ *   <li>Bytes 0 to 4: len(k)
+ *   <li>Bytes 4 to 4 + len(k): key data
+ *   <li>Bytes 4 + len(k) to 8 + len(k): len(v)
+ *   <li>Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ * </ul>
+ *
+ * <p>{@code BytesHashMap} are influenced by Apache Spark BytesToBytesMap.
+ */
+public abstract class BytesHashMapBase<K> extends BytesMap<K, BinaryRowData> {

Review comment:
       NIT: I prefer `AbstractBytesHashMap`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org