You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/02/21 23:36:59 UTC
[iceberg] branch master updated: Spark: Add StreamingOffset for
structured streaming reader (#2092)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 91ac421 Spark: Add StreamingOffset for structured streaming reader (#2092)
91ac421 is described below
commit 91ac42174e4c535ece4e36db2cb587a23babced9
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Feb 22 07:36:49 2021 +0800
Spark: Add StreamingOffset for structured streaming reader (#2092)
---
.../iceberg/spark/source/StreamingOffset.java | 135 +++++++++++++++++++++
.../iceberg/spark/source/TestStreamingOffset.java | 54 +++++++++
2 files changed, 189 insertions(+)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
new file mode 100644
index 0000000..d2bb22d
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UncheckedIOException;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.JsonUtil;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+
+class StreamingOffset extends Offset {
+ static final StreamingOffset START_OFFSET = new StreamingOffset(-1L, -1, false);
+
+ private static final int CURR_VERSION = 1;
+ private static final String VERSION = "version";
+ private static final String SNAPSHOT_ID = "snapshot_id";
+ private static final String POSITION = "position";
+ private static final String SCAN_ALL_FILES = "scan_all_files";
+
+ private final long snapshotId;
+ private final long position;
+ private final boolean scanAllFiles;
+
+ /**
+ * An implementation of Spark Structured Streaming Offset, to track the current processed files of
+ * Iceberg table.
+ *
+ * @param snapshotId The current processed snapshot id.
+ * @param position The position of last scanned file in snapshot.
+ * @param scanAllFiles whether to scan all files in a snapshot; for example, to read
+ * all data when starting a stream.
+ */
+ StreamingOffset(long snapshotId, long position, boolean scanAllFiles) {
+ this.snapshotId = snapshotId;
+ this.position = position;
+ this.scanAllFiles = scanAllFiles;
+ }
+
+ static StreamingOffset fromJson(String json) {
+ Preconditions.checkNotNull(json, "Cannot parse StreamingOffset JSON: null");
+
+ try {
+ JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class);
+ // The version of StreamingOffset. The offset was created with a version number
+ // used to validate when deserializing from json string.
+ int version = JsonUtil.getInt(VERSION, node);
+ Preconditions.checkArgument(version == CURR_VERSION,
+ "Cannot parse offset JSON: offset version %s is not supported", version);
+
+ long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
+ int position = JsonUtil.getInt(POSITION, node);
+ boolean shouldScanAllFiles = JsonUtil.getBool(SCAN_ALL_FILES, node);
+
+ return new StreamingOffset(snapshotId, position, shouldScanAllFiles);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to parse StreamingOffset from JSON string %s", json), e);
+ }
+ }
+
+ @Override
+ public String json() {
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ generator.writeStartObject();
+ generator.writeNumberField(VERSION, CURR_VERSION);
+ generator.writeNumberField(SNAPSHOT_ID, snapshotId);
+ generator.writeNumberField(POSITION, position);
+ generator.writeBooleanField(SCAN_ALL_FILES, scanAllFiles);
+ generator.writeEndObject();
+ generator.flush();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to write StreamingOffset to json", e);
+ }
+
+ return writer.toString();
+ }
+
+ long snapshotId() {
+ return snapshotId;
+ }
+
+ long position() {
+ return position;
+ }
+
+ boolean shouldScanAllFiles() {
+ return scanAllFiles;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StreamingOffset) {
+ StreamingOffset offset = (StreamingOffset) obj;
+ return offset.snapshotId == snapshotId &&
+ offset.position == position &&
+ offset.scanAllFiles == scanAllFiles;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(snapshotId, position, scanAllFiles);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Streaming Offset[%d: position (%d) scan_all_files (%b)]",
+ snapshotId, position, scanAllFiles);
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java
new file mode 100644
index 0000000..d6f8310
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestStreamingOffset.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.util.Arrays;
+import org.apache.iceberg.util.JsonUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStreamingOffset {
+
+ @Test
+ public void testJsonConversion() {
+ StreamingOffset[] expected = new StreamingOffset[]{
+ new StreamingOffset(System.currentTimeMillis(), 1L, false),
+ new StreamingOffset(System.currentTimeMillis(), 2L, false),
+ new StreamingOffset(System.currentTimeMillis(), 3L, false),
+ new StreamingOffset(System.currentTimeMillis(), 4L, true)
+ };
+ Assert.assertArrayEquals("StreamingOffsets should match", expected,
+ Arrays.stream(expected).map(elem -> StreamingOffset.fromJson(elem.json())).toArray());
+ }
+
+ @Test
+ public void testToJson() throws Exception {
+ StreamingOffset expected = new StreamingOffset(System.currentTimeMillis(), 1L, false);
+ ObjectNode actual = JsonUtil.mapper().createObjectNode();
+ actual.put("version", 1);
+ actual.put("snapshot_id", expected.snapshotId());
+ actual.put("position", 1L);
+ actual.put("scan_all_files", false);
+ String expectedJson = expected.json();
+ String actualJson = JsonUtil.mapper().writeValueAsString(actual);
+ Assert.assertEquals("Json should match", expectedJson, actualJson);
+ }
+}