You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:51:44 UTC

[GitHub] [flink] lgo commented on a change in pull request #12345: [FLINK-17288] [Runtime/StateBackends] Add RocksDB SST ingestion for batch writes

lgo commented on a change in pull request #12345:
URL: https://github.com/apache/flink/pull/12345#discussion_r430484842



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/writer/RocksDBSSTIngestWriter.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.writer;
+
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.IOUtils;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link RocksDBSSTIngestWriter} implements {@link RocksDBWriter}, providing writes by creating sst
+ * files and instructing {@link RocksDB} to ingest them (via {@link
+ * RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)}).
+ *
+ * <p>{@link RocksDBSSTIngestWriter} supports writing to multiple {@link ColumnFamilyHandle},
+ * assuming the writes within a {@link ColumnFamilyHandle} are ordered.
+ *
+ * <p>It uses {@link RocksDBSSTWriter} for creating the sst file of a {@link ColumnFamilyHandle}.
+ *
+ * <p>IMPORTANT: This class is not thread safe.
+ *
+ * <p>@lgo: describe in detail what this writer is good for. (only Puts, no more atomicity than the
+ * other) => note: this writer does not have completely atomic operations. If a failure occurs
+ * between ingested files (being flushed), the RocksDB state will be left with partial writes. It is
+ * unclear if this is a problem for Flink, but it should be tested.
+ */
+public class RocksDBSSTIngestWriter implements RocksDBWriter {
+
+	/**
+	 * {@code INGEST_EXTERNAL_FILE_OPTIONS} are the {@link IngestExternalFileOptions} provided to
+	 * RocksDB when calling {@link RocksDB#ingestExternalFile(ColumnFamilyHandle, List,
+	 * IngestExternalFileOptions)}. We only use one set of options, so we use a singleton that is not
+	 * closed.
+	 */
+	// @lgo: because flink is using the JNI 5.14, the API for IngestExternalFileOptions is pretty
+	// bad and the builder-pattern does not exist.
+	private static final IngestExternalFileOptions INGEST_EXTERNAL_FILE_OPTIONS =
+		new IngestExternalFileOptions(
+			// Move files rather than copying them. Because we are generating
+			// one-off files for import, it does not matter if the are moved and
+			// import fails.
+			//
+			// @lgo: ensure this does not cause stray RocksDB sst files on a host when ingestion
+			// fails.
+			/* moveFiles */ true,
+
+			// Set to the default (true).
+			/* snapshotConsistency */ true,
+
+			// Set to the default (true).
+			/* allowGlobalSeqNo */ true,
+
+			// By disallowing a blocking flush, ingestion loudly fails when it
+			// has keys that overlap the RocksDB memtable.
+			/* allowBlockingFlush */ false);
+
+	/** {@code maxSstSize} is the maximum size of an sst file before flushing it. */
+	private final int maxSstSize;
+
+	/**
+	 * {@code envOptions} are the {@link EnvOptions} provided to the underlying {@link
+	 * RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final EnvOptions envOptions;
+
+	/**
+	 * {@code options} are the {@link Options} provided to the underlying {@link RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final Options options;
+
+	/** {@code db} is the active RocksDB database to write files to. */
+	private final RocksDB db;
+
+	/**
+	 * {@code sstFileWriters} contains the currently open {@link RocksDBSSTWriter} for each {@link
+	 * ColumnFamilyHandle}.
+	 */
+	private final HashMap<Integer, RocksDBSSTWriter> columnFamilyWriters = new HashMap<>();
+
+	/**
+	 * {@code ingestionTempDir} is the directory used to write temporary sst files before ingesting
+	 * them into {@link RocksDB}.
+	 */
+	private final File ingestionTempDir;
+
+	public RocksDBSSTIngestWriter(
+		@Nonnull RocksDB rocksDB,
+		@Nonnegative int maxSstSize,
+		@Nullable EnvOptions envOptions,
+		@Nullable Options options,
+		@Nullable File tempDir)
+		throws IOException {
+		this.db = rocksDB;
+		this.maxSstSize = maxSstSize;
+		this.envOptions = envOptions;
+		this.options = options;
+
+		// Set up a temporary directory for writing generated SST files.
+		// Either, use the provided temporary directory (such as during tests), or create a new one.
+		if (tempDir != null) {
+			this.ingestionTempDir = tempDir;
+		} else {
+			// @lgo: clean the temporary folder up. This may not actually be bad considering
+			// ingestion will move the temporary sst files out.
+			this.ingestionTempDir = Files.createTempDirectory("rocksdb-sst-writer-temp-").toFile();
+		}
+		maxSstSize = 0;
+	}
+
+	public void put(
+		@Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull byte[] key, @Nonnull byte[] value)
+		throws RocksDBException, IOException {
+		// Get the sst writer for the column family.
+		RocksDBSSTWriter writer = ensureSSTableWriter(columnFamilyHandle);
+		// Insert the k/v.
+		writer.put(key, value);
+		// Flush the sst and ingest it, if it needs to be flushed.
+		flushIfNeeded(columnFamilyHandle, writer);
+	}
+
+	private RocksDBSSTWriter ensureSSTableWriter(@Nonnull ColumnFamilyHandle columnFamilyHandle)
+		throws RocksDBException, IOException {
+		// Return an existing writer if there is one, Otherwise prepare a new one.
+		if (columnFamilyWriters.containsKey(columnFamilyHandle.getID())) {
+			return columnFamilyWriters.get(columnFamilyHandle.getID());
+		}
+
+		// Create a new sst file in the temporary folder.
+		final String sstFileName = "ingest_" + new AbstractID() + ".sst";
+		File sstFile = new File(ingestionTempDir, sstFileName);
+
+		// Initialize the sst writer.
+		RocksDBSSTWriter writer =
+			new RocksDBSSTWriter(envOptions, options, columnFamilyHandle, sstFile);
+
+		// Store the new writer for the column family.
+		columnFamilyWriters.put(columnFamilyHandle.getID(), writer);
+
+		return writer;
+	}
+
+	/**
+	 * Flushes the data for a particular {@link RocksDBSSTWriter} into the {@link RocksDB} instance.
+	 * After flushing, the {@link ColumnFamilyHandle} will not have an active {@link
+	 * RocksDBSSTWriter}, and it will need to be initialized by {@link
+	 * #ensureSSTableWriter(ColumnFamilyHandle)}.
+	 *
+	 * @throws RocksDBException
+	 */
+	private void flushAndCloseWriter(@Nonnull RocksDBSSTWriter writer) throws RocksDBException {
+		// Finish the sst writer.
+		writer.finish();
+
+		// Instruct RocksDB to ingest the sst files. Because all of the ingested files are
+		// for different column families and the JNI SstFileWriter does not allow setting the
+		// ColumnFamilyHeader when writing the file, need to call ingestExternalFile for one
+		// sst file at a time.
+		//
+		// Because the IngestExternalFileOptions specifies to move the sst file, we do not need
+		// to clean up the written file.
+		List<String> files = Collections.singletonList(writer.getFile().getAbsolutePath());
+		db.ingestExternalFile(writer.getColumnFamilyHandle(), files, INGEST_EXTERNAL_FILE_OPTIONS);

Review comment:
       TODO: I believe this needs `ColumnFamilyOptions` to be passed through. If not here, somewhere else on this write-path.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/writer/RocksDBSSTIngestWriter.java
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.writer;
+
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.IOUtils;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link RocksDBSSTIngestWriter} implements {@link RocksDBWriter}, providing writes by creating sst
+ * files and instructing {@link RocksDB} to ingest them (via {@link
+ * RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)}).
+ *
+ * <p>{@link RocksDBSSTIngestWriter} supports writing to multiple {@link ColumnFamilyHandle},
+ * assuming the writes within a {@link ColumnFamilyHandle} are ordered.
+ *
+ * <p>It uses {@link RocksDBSSTWriter} for creating the sst file of a {@link ColumnFamilyHandle}.
+ *
+ * <p>IMPORTANT: This class is not thread safe.
+ *
+ * <p>@lgo: describe in detail what this writer is good for. (only Puts, no more atomicity than the
+ * other) => note: this writer does not have completely atomic operations. If a failure occurs
+ * between ingested files (being flushed), the RocksDB state will be left with partial writes. It is
+ * unclear if this is a problem for Flink, but it should be tested.
+ */
+public class RocksDBSSTIngestWriter implements RocksDBWriter {
+
+	/**
+	 * {@code INGEST_EXTERNAL_FILE_OPTIONS} are the {@link IngestExternalFileOptions} provided to
+	 * RocksDB when calling {@link RocksDB#ingestExternalFile(ColumnFamilyHandle, List,
+	 * IngestExternalFileOptions)}. We only use one set of options, so we use a singleton that is not
+	 * closed.
+	 */
+	// @lgo: because flink is using the JNI 5.14, the API for IngestExternalFileOptions is pretty
+	// bad and the builder-pattern does not exist.
+	private static final IngestExternalFileOptions INGEST_EXTERNAL_FILE_OPTIONS =
+		new IngestExternalFileOptions(
+			// Move files rather than copying them. Because we are generating
+			// one-off files for import, it does not matter if the are moved and
+			// import fails.
+			//
+			// @lgo: ensure this does not cause stray RocksDB sst files on a host when ingestion
+			// fails.
+			/* moveFiles */ true,
+
+			// Set to the default (true).
+			/* snapshotConsistency */ true,
+
+			// Set to the default (true).
+			/* allowGlobalSeqNo */ true,
+
+			// By disallowing a blocking flush, ingestion loudly fails when it
+			// has keys that overlap the RocksDB memtable.
+			/* allowBlockingFlush */ false);
+
+	/** {@code maxSstSize} is the maximum size of an sst file before flushing it. */
+	private final int maxSstSize;
+
+	/**
+	 * {@code envOptions} are the {@link EnvOptions} provided to the underlying {@link
+	 * RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final EnvOptions envOptions;
+
+	/**
+	 * {@code options} are the {@link Options} provided to the underlying {@link RocksDBSSTWriter}.
+	 */
+	// @lgo: plumb through the options into the constructor
+	// and set sane options.
+	private final Options options;
+
+	/** {@code db} is the active RocksDB database to write files to. */
+	private final RocksDB db;
+
+	/**
+	 * {@code sstFileWriters} contains the currently open {@link RocksDBSSTWriter} for each {@link
+	 * ColumnFamilyHandle}.
+	 */
+	private final HashMap<Integer, RocksDBSSTWriter> columnFamilyWriters = new HashMap<>();
+
+	/**
+	 * {@code ingestionTempDir} is the directory used to write temporary sst files before ingesting
+	 * them into {@link RocksDB}.
+	 */
+	private final File ingestionTempDir;
+
+	public RocksDBSSTIngestWriter(
+		@Nonnull RocksDB rocksDB,
+		@Nonnegative int maxSstSize,
+		@Nullable EnvOptions envOptions,
+		@Nullable Options options,
+		@Nullable File tempDir)
+		throws IOException {
+		this.db = rocksDB;
+		this.maxSstSize = maxSstSize;
+		this.envOptions = envOptions;
+		this.options = options;
+
+		// Set up a temporary directory for writing generated SST files.
+		// Either, use the provided temporary directory (such as during tests), or create a new one.
+		if (tempDir != null) {
+			this.ingestionTempDir = tempDir;
+		} else {
+			// @lgo: clean the temporary folder up. This may not actually be bad considering
+			// ingestion will move the temporary sst files out.
+			this.ingestionTempDir = Files.createTempDirectory("rocksdb-sst-writer-temp-").toFile();
+		}
+		maxSstSize = 0;
+	}
+
+	public void put(
+		@Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull byte[] key, @Nonnull byte[] value)
+		throws RocksDBException, IOException {
+		// Get the sst writer for the column family.
+		RocksDBSSTWriter writer = ensureSSTableWriter(columnFamilyHandle);
+		// Insert the k/v.
+		writer.put(key, value);
+		// Flush the sst and ingest it, if it needs to be flushed.
+		flushIfNeeded(columnFamilyHandle, writer);
+	}
+
+	private RocksDBSSTWriter ensureSSTableWriter(@Nonnull ColumnFamilyHandle columnFamilyHandle)
+		throws RocksDBException, IOException {
+		// Return an existing writer if there is one, Otherwise prepare a new one.
+		if (columnFamilyWriters.containsKey(columnFamilyHandle.getID())) {
+			return columnFamilyWriters.get(columnFamilyHandle.getID());
+		}
+
+		// Create a new sst file in the temporary folder.
+		final String sstFileName = "ingest_" + new AbstractID() + ".sst";
+		File sstFile = new File(ingestionTempDir, sstFileName);
+
+		// Initialize the sst writer.
+		RocksDBSSTWriter writer =
+			new RocksDBSSTWriter(envOptions, options, columnFamilyHandle, sstFile);
+
+		// Store the new writer for the column family.
+		columnFamilyWriters.put(columnFamilyHandle.getID(), writer);
+
+		return writer;
+	}
+
+	/**
+	 * Flushes the data for a particular {@link RocksDBSSTWriter} into the {@link RocksDB} instance.
+	 * After flushing, the {@link ColumnFamilyHandle} will not have an active {@link
+	 * RocksDBSSTWriter}, and it will need to be initialized by {@link
+	 * #ensureSSTableWriter(ColumnFamilyHandle)}.
+	 *
+	 * @throws RocksDBException
+	 */
+	private void flushAndCloseWriter(@Nonnull RocksDBSSTWriter writer) throws RocksDBException {
+		// Finish the sst writer.
+		writer.finish();
+
+		// Instruct RocksDB to ingest the sst files. Because all of the ingested files are
+		// for different column families and the JNI SstFileWriter does not allow setting the
+		// ColumnFamilyHeader when writing the file, need to call ingestExternalFile for one
+		// sst file at a time.
+		//
+		// Because the IngestExternalFileOptions specifies to move the sst file, we do not need
+		// to clean up the written file.
+		List<String> files = Collections.singletonList(writer.getFile().getAbsolutePath());
+		db.ingestExternalFile(writer.getColumnFamilyHandle(), files, INGEST_EXTERNAL_FILE_OPTIONS);

Review comment:
       TODO: I believe this needs `ColumnFamilyOptions` to be passed through. If not here, somewhere else on this write-path (such as via `Options`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org