You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/21 00:11:41 UTC

[beam] branch master updated: [CdapIO] HasOffset interface was implemented (#22193)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new caaefc0c0b6 [CdapIO] HasOffset interface was implemented (#22193)
caaefc0c0b6 is described below

commit caaefc0c0b663212a36807b1ebb1d18b76a0ae6c
Author: Andrey <an...@akvelon.com>
AuthorDate: Thu Jul 21 06:11:35 2022 +0600

    [CdapIO] HasOffset interface was implemented (#22193)
    
    * [BEAM-22148] HasOffset interface was implemented
    
    * [BEAM-22148] Formatting was fixed
---
 sdks/java/io/sparkreceiver/build.gradle            |  2 ++
 .../beam/sdk/io/sparkreceiver/HasOffset.java       | 31 ++++++++++++++++++++++
 settings.gradle.kts                                |  1 +
 3 files changed, 34 insertions(+)

diff --git a/sdks/java/io/sparkreceiver/build.gradle b/sdks/java/io/sparkreceiver/build.gradle
index 09f98db1246..f226435631e 100644
--- a/sdks/java/io/sparkreceiver/build.gradle
+++ b/sdks/java/io/sparkreceiver/build.gradle
@@ -41,4 +41,6 @@ dependencies {
     compileOnly "org.scala-lang:scala-library:2.11.12"
     testImplementation project(path: ":sdks:java:io:cdap", configuration: "testRuntimeMigration")
     testImplementation library.java.junit
+    testImplementation project(path: ":runners:direct-java", configuration: "shadow")
+    testImplementation project(path: ":examples:java", configuration: "testRuntimeMigration")
 }
diff --git a/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java b/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
new file mode 100644
index 00000000000..b3c9e11defd
--- /dev/null
+++ b/sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/HasOffset.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+/**
+ * Interface for any Spark {@link org.apache.spark.streaming.receiver.Receiver} that supports
+ * reading from and to some offset.
+ */
+public interface HasOffset {
+
+  /** @param offset inclusive start offset from which the reading should be started. */
+  void setStartOffset(Long offset);
+
+  /** @return exclusive end offset to which the reading from current page will occur. */
+  Long getEndOffset();
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 05ecc694733..5a35c29f2e1 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -194,6 +194,7 @@ include(":sdks:java:io:pulsar")
 include(":sdks:java:io:rabbitmq")
 include(":sdks:java:io:redis")
 include(":sdks:java:io:solr")
+include(":sdks:java:io:sparkreceiver")
 include(":sdks:java:io:snowflake")
 include(":sdks:java:io:snowflake:expansion-service")
 include(":sdks:java:io:splunk")