You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/20 21:38:34 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #15505: [BEAM-12697] Add abstract buffer for SBE extension

TheNeuralBit commented on a change in pull request #15505:
URL: https://github.com/apache/beam/pull/15505#discussion_r711359379



##########
File path: sdks/java/extensions/sbe/src/main/java/org/apache/beam/sdk/extensions/sbe/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Extension for working with PCollections of SBE messages. */

Review comment:
       Maybe add a note here that the entire package is `@Experimental`. Is it possible to just put the annotation on the package too?

##########
File path: sdks/java/extensions/sbe/build.gradle
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+  automaticModuleName: 'org.apache.beam.sdk.extensions.sbe',
+)
+
+description = "Apache Beam :: SDKs :: Java :: Extensions :: SBE"
+ext.summary = "Add support to Beam for FIX SBE"
+
+dependencies {
+    implementation 'com.google.errorprone:error_prone_annotations:2.3.4'
+    implementation 'org.agrona:agrona:1.12.0'
+    implementation 'org.apache.beam:beam-vendor-guava-26_0-jre:0.1'
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    testImplementation "junit:junit:4.13"

Review comment:
       Version numbers for shared dependencies should come from the ones defined in `BeamModulePlugin.groovy`: https://github.com/apache/beam/blob/8072cc0bcfd4eee08a95902e13b9bf1dc2338693/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L482
   
   For example, the protobuf extension does this: https://github.com/apache/beam/blob/8072cc0bcfd4eee08a95902e13b9bf1dc2338693/sdks/java/extensions/protobuf/build.gradle#L36-L37

##########
File path: sdks/java/extensions/sbe/src/main/java/org/apache/beam/sdk/extensions/sbe/DirectByteBuffer.java
##########
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sbe;
+
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.errorprone.annotations.DoNotCall;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import javax.annotation.Nonnull;
+import org.agrona.DirectBuffer;
+import org.agrona.MutableDirectBuffer;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A default implementation for {@link DirectBuffer} that is designed for use in Beam types.
+ *
+ * <p>This fully implements reading data. Since data cannot be modified, it makes no attempt to
+ * synchronize threads. If a concrete class needs to also implement {@link MutableDirectBuffer},
+ * guaranteeing thread-safety is the responsibility of that class.
+ *
+ * <p>To better guarantee PCollection safety, {@link DirectByteBuffer} does not implement the
+ * following {@link DirectBuffer} methods:
+ *
+ * <ul>
+ *   <li>Any {@code wrap} method.
+ *   <li>{@link DirectBuffer#addressOffset()}
+ *   <li>{@link DirectBuffer#wrapAdjustment()}
+ *   <li>{@link DirectBuffer#byteArray()}
+ *   <li>{@link DirectBuffer#byteBuffer()}
+ * </ul>
+ *
+ * <p>The last two can still be gotten via copy rather than by accessing the underlying buffer
+ * directly.
+ *
+ * <p>Instances can be created in either copy mode or view mode. Copy mode will copy any passed-in
+ * data to an internal buffer, and view mode will be backed directly by the passed-in data.
+ * Regardless of the mode, the 0 index of all reads and writes will be the {@link
+ * ByteBuffer#position()} value at the time of instance creation. This offset will not change even
+ * if the position is changed externally while in view mode. Reads are allowed in the range [0,
+ * limit), and writes are allowed in the range [0, capacity) regardless of mode. The limit can
+ * change if the implementation allows writing. The capacity is always the same as the passed-in
+ * {@link ByteBuffer}.
+ *
+ * <p>Implementations should use {@link DirectByteBuffer#DEFAULT_BYTE_ORDER} to determine the order
+ * of bytes. If a passed-in {@link ByteOrder} is different, then the bytes should be reversed before
+ * writes or after reads.
+ *
+ * <p>Implementations should implement {@link Object#equals(Object)} and {@link Object#hashCode()}.
+ * Implementations are required to implement {@link Comparable#compareTo(Object)}.
+ */
+@Experimental(Kind.EXTENSION)
+public abstract class DirectByteBuffer implements DirectBuffer {
+
+  protected static final String UNSAFE_FOR_PCOLLECTION = "Unsafe for PCollection";
+
+  /** Order of bytes in {@link DirectByteBuffer#buffer}. */
+  protected static final ByteOrder DEFAULT_BYTE_ORDER = ByteOrder.BIG_ENDIAN;
+
+  @Nonnull protected final ByteBuffer buffer;
+  protected final int offset;
+  protected int length; // Length might change if implementation allows writing to the buffer
+  protected final int capacity;
+
+  /**
+   * Creates a new instance that handles {@code buffer} according to {@code mode}.
+   *
+   * @param buffer the {@link ByteBuffer} to use to set the underlying data
+   * @param mode how {@code buffer} should be used to set the underlying data
+   */
+  protected DirectByteBuffer(@Nonnull ByteBuffer buffer, CreateMode mode) {
+    this.length = buffer.limit() - buffer.position();
+    if (mode == CreateMode.COPY) {
+      this.buffer = createCopyOfByteBuffer(buffer, buffer.position(), this.length);
+      this.offset = 0;
+    } else {

Review comment:
       Could you make this explicitly check for `VIEW`, and raise an error in the else case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org