You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/21 23:36:59 UTC

[iceberg] branch master updated: Spark: Add StreamingOffset for structured streaming reader (#2092)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 91ac421  Spark: Add StreamingOffset for structured streaming reader (#2092)
91ac421 is described below

commit 91ac42174e4c535ece4e36db2cb587a23babced9
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Feb 22 07:36:49 2021 +0800

    Spark: Add StreamingOffset for structured streaming reader (#2092)
---
 .../iceberg/spark/source/StreamingOffset.java      | 135 +++++++++++++++++++++
 .../iceberg/spark/source/TestStreamingOffset.java  |  54 +++++++++
 2 files changed, 189 insertions(+)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
new file mode 100644
index 0000000..d2bb22d
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+
+class StreamingOffset extends Offset {
+  static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false);
+
+  private static final int CURR_VERSION = 1;
+  private static final String VERSION = "version";
+  private static final String SNAPSHOT_ID = "snapshot_id";
+  private static final String POSITION = "position";
+  private static final String SCAN_ALL_FILES = "scan_all_files";
+
+  private final long snapshotId;
+  private final long position;
+  private final boolean scanAllFiles;
+
+  /**
+   * An implementation of Spark Structured Streaming Offset, to track the current processed files of
+   * Iceberg table.
+   *
+   * @param snapshotId             The current processed snapshot id.
+   * @param position               The position of last scanned file in snapshot.
+   * @param scanAllFiles           whether to scan all files in a snapshot; for example, to read
+   *                               all data when starting a stream.
+   */
+  StreamingOffset(long snapshotId, long position, boolean scanAllFiles) {
+    this.snapshotId = snapshotId;
+    this.position = position;
+    this.scanAllFiles = scanAllFiles;
+  }
+
+  static StreamingOffset fromJson(String json) {
+    Preconditions.checkNotNull(json, "Cannot parse StreamingOffset JSON: null");
+
+    try {
+      JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class);
+      // The version of StreamingOffset. The offset was created with a version number
+      // used to validate when deserializing from json string.
+      int version = JsonUtil.getInt(VERSION, node);
+      Preconditions.checkArgument(version == CURR_VERSION,
+          "Cannot parse offset JSON: offset version %s is not supported", version);
+
+      long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
+      int position = JsonUtil.getInt(POSITION, node);
+      boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node);
+
+      return new StreamingOffset(snapshotId, position, shouldScanAllFiles);
+    } catch (IOException e) {
+      throw new IllegalArgumentException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e);
+    }
+  }
+
+  @Override
+  public String json() {
+    StringWriter writer = new StringWriter();
+    try {
+      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+      generator.writeStartObject();
+      generator.writeNumberField(VERSION, CURR_VERSION);
+      generator.writeNumberField(SNAPSHOT_ID, snapshotId);
+      generator.writeNumberField(POSITION, position);
+      generator.writeBooleanField(SCAN_ALL_FILES, scanAllFiles);
+      generator.writeEndObject();
+      generator.flush();
+
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to write StreamingOffset to json", e);
+    }
+
+    return writer.toString();
+  }
+
+  long snapshotId() {
+    return snapshotId;
+  }
+
+  long position() {
+    return position;
+  }
+
+  boolean shouldScanAllFiles() {
+    return scanAllFiles;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof StreamingOffset) {
+      StreamingOffset offset = (StreamingOffset) obj;
+      return offset.snapshotId == snapshotId &&
+          offset.position == position &&
+          offset.scanAllFiles == scanAllFiles;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(snapshotId, position, scanAllFiles);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]",
+        snapshotId, position, scanAllFiles);
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java
new file mode 100644
index 0000000..d6f8310
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Arrays;
+import org.apache.iceberg.util.JsonUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStreamingOffset {
+
+  @Test
+  public void testJsonConversion() {
+    StreamingOffset[] expected = new StreamingOffset[]{
+        new StreamingOffset(System.currentTimeMillis(), 1L, false),
+        new StreamingOffset(System.currentTimeMillis(), 2L, false),
+        new StreamingOffset(System.currentTimeMillis(), 3L, false),
+        new StreamingOffset(System.currentTimeMillis(), 4L, true)
+    };
+    Assert.assertArrayEquals("StreamingOffsets should match", expected,
+        Arrays.stream(expected).map(elem -> StreamingOffset.fromJson(elem.json())).toArray());
+  }
+
+  @Test
+  public void testToJson() throws Exception {
+    StreamingOffset expected = new StreamingOffset(System.currentTimeMillis(), 1L, false);
+    ObjectNode actual = JsonUtil.mapper().createObjectNode();
+    actual.put("version", 1);
+    actual.put("snapshot_id", expected.snapshotId());
+    actual.put("position", 1L);
+    actual.put("scan_all_files", false);
+    String expectedJson = expected.json();
+    String actualJson = JsonUtil.mapper().writeValueAsString(actual);
+    Assert.assertEquals("Json should match", expectedJson, actualJson);
+  }
+}