You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zookeeper.apache.org by yisong-yue <gi...@git.apache.org> on 2018/11/07 07:28:21 UTC
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
GitHub user yisong-yue opened a pull request:
https://github.com/apache/zookeeper/pull/690
ZOOKEEPER-3179: Add snapshot compression to reduce the disk IO
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yisong-yue/zookeeper ZOOKEEPER-3179
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/zookeeper/pull/690.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #690
----
commit 25ead63f620b99571850fdd7f827d685c79e3214
Author: Yisong Yue <yi...@...>
Date: 2018-11-07T05:04:32Z
ZOOKEEPER-3179: Add snapshot compression to reduce the disk IO
commit 9fd3071e1487044f77172765222fe6f520961946
Author: Yisong Yue <yi...@...>
Date: 2018-11-07T07:24:34Z
small improvements
----
---
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/690#discussion_r232363306
--- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyCodec;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Represent the Stream used in serialize and deserialize the Snapshot.
+ */
+public class SnapStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
+
+ public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
+ "zookeeper.snapshot.compression.method";
+
+ private static StreamMode streamMode =
+ StreamMode.fromString(
+ System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
+ StreamMode.DEFAULT_MODE.getName()));
+
+ static {
+ LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
+ }
+
+ public static enum StreamMode {
+ GZIP("gz"),
+ SNAPPY("snappy"),
+ CHECKED("");
+
+ public static final StreamMode DEFAULT_MODE = CHECKED;
+
+ private String name;
+
+ StreamMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFileExtension() {
+ return name.isEmpty() ? "" : "." + name;
+ }
+
+ public static StreamMode fromString(String name) {
+ for (StreamMode c : values()) {
+ if (c.getName().compareToIgnoreCase(name) == 0) {
+ return c;
+ }
+ }
+ return DEFAULT_MODE;
+ }
+ }
+
+ /**
+ * Return the CheckedInputStream based on the extension of the fileName.
+ *
+ * @param fileName the file the InputStream read from
+ * @return the specific InputStream
+ * @throws IOException
+ */
+ public static CheckedInputStream getInputStream(File file) throws IOException {
+ FileInputStream fis = new FileInputStream(file);
+ InputStream is;
+ switch (getStreamMode(file.getName())) {
+ case GZIP:
+ is = new GZIPInputStream(fis);
+ break;
+ case SNAPPY:
+ is = new SnappyInputStream(fis);
+ break;
+ case CHECKED:
+ default:
+ is = new BufferedInputStream(fis);
+ }
+ return new CheckedInputStream(is, new Adler32());
+ }
+
+ /**
+ * Return the OutputStream based on predefined stream mode.
+ *
+ * @param fileName the file the OutputStream writes to
+ * @return the specific OutputStream
+ * @throws IOException
+ */
+ public static CheckedOutputStream getOutputStream(File file) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ OutputStream os;
+ switch (streamMode) {
+ case GZIP:
+ os = new GZIPOutputStream(fos);
+ break;
+ case SNAPPY:
+ os = new SnappyOutputStream(fos);
+ break;
+ case CHECKED:
+ default:
+ os = new BufferedOutputStream(fos);
+ }
+ return new CheckedOutputStream(os, new Adler32());
+ }
+
+ /**
+ * Write specific seal to the OutputArchive and close the OutputStream.
+ * Currently, only CheckedOutputStream will write it's checkSum to the
+ * end of the stream.
+ *
+ */
+ public static void sealStream(CheckedOutputStream os, OutputArchive oa)
+ throws IOException {
+ long val = os.getChecksum().getValue();
+ oa.writeLong(val, "val");
+ oa.writeString("/", "path");
+ }
+
+ /**
+ * Verify the integrity of the seal, only CheckedInputStream will verify
+ * the checkSum of the content.
+ *
+ */
+ static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
+ throws IOException {
+ long checkSum = is.getChecksum().getValue();
+ long val = ia.readLong("val");
+ if (val != checkSum) {
+ throw new IOException("CRC corruption");
+ }
+ }
+
+ /**
+ * Verifies that the file is a valid snapshot. Snapshot may be invalid if
+ * it's incomplete as in a situation when the server dies while in the
+ * process of storing a snapshot. Any files that are improperly formated
+ * or corrupted are invalid. Any file that is not a snapshot is also an
+ * invalid snapshot.
+ *
+ * @param file file to verify
+ * @return true if the snapshot is valid
+ * @throws IOException
+ */
+ public static boolean isValidSnapshot(File file) throws IOException {
+ if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
+ return false;
+ }
+
+ String fileName = file.getName();
+ if (Util.getZxidFromName(fileName, "snapshot") == -1) {
+ return false;
+ }
+
+ boolean isValid = false;
+ switch (getStreamMode(fileName)) {
+ case GZIP:
+ isValid = isValidGZipStream(file);
+ break;
+ case SNAPPY:
+ isValid = isValidSnappyStream(file);
+ break;
+ case CHECKED:
+ default:
+ isValid = isValidCheckedStream(file);
+ }
+ return isValid;
+ }
+
+ public static void setStreamMode(StreamMode mode) {
+ streamMode = mode;
+ }
+
+ public static StreamMode getStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * Detect the stream mode from file name extension
+ *
+ * @param fileName
+ * @return
+ */
+ public static StreamMode getStreamMode(String fileName) {
+ String[] splitSnapName = fileName.split("\\.");
+
+ // Use file extension to detect format
+ if (splitSnapName.length > 1) {
+ String mode = splitSnapName[splitSnapName.length - 1];
+ return StreamMode.fromString(mode);
+ }
+
+ return StreamMode.CHECKED;
+ }
+
+ /**
+ * Certify the GZip stream integrity by checking the header
+ * for the GZip magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct GZip magic string
+ * @throws IOException
+ */
+ private static boolean isValidGZipStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[2];
+
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, 2);
--- End diff --
We can also use try-with-resources
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/zookeeper/pull/690
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2608/
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:
https://github.com/apache/zookeeper/pull/690
squashed two commits into one.
---
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/690#discussion_r232326140
--- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyCodec;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Represent the Stream used in serialize and deserialize the Snapshot.
+ */
+public class SnapStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
+
+ public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
+ "zookeeper.snapshot.compression.method";
+
+ private static StreamMode streamMode =
+ StreamMode.fromString(
+ System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
+ StreamMode.DEFAULT_MODE.getName()));
+
+ static {
+ LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
+ }
+
+ public static enum StreamMode {
+ GZIP("gz"),
+ SNAPPY("snappy"),
+ CHECKED("");
+
+ public static final StreamMode DEFAULT_MODE = CHECKED;
+
+ private String name;
+
+ StreamMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFileExtension() {
+ return name.isEmpty() ? "" : "." + name;
+ }
+
+ public static StreamMode fromString(String name) {
+ for (StreamMode c : values()) {
+ if (c.getName().compareToIgnoreCase(name) == 0) {
+ return c;
+ }
+ }
+ return DEFAULT_MODE;
+ }
+ }
+
+ /**
+ * Return the CheckedInputStream based on the extension of the fileName.
+ *
+ * @param fileName the file the InputStream read from
+ * @return the specific InputStream
+ * @throws IOException
+ */
+ public static CheckedInputStream getInputStream(File file) throws IOException {
+ FileInputStream fis = new FileInputStream(file);
+ InputStream is;
+ switch (getStreamMode(file.getName())) {
+ case GZIP:
+ is = new GZIPInputStream(fis);
+ break;
+ case SNAPPY:
+ is = new SnappyInputStream(fis);
+ break;
+ case CHECKED:
+ default:
+ is = new BufferedInputStream(fis);
+ }
+ return new CheckedInputStream(is, new Adler32());
+ }
+
+ /**
+ * Return the OutputStream based on predefined stream mode.
+ *
+ * @param fileName the file the OutputStream writes to
+ * @return the specific OutputStream
+ * @throws IOException
+ */
+ public static CheckedOutputStream getOutputStream(File file) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ OutputStream os;
+ switch (streamMode) {
+ case GZIP:
+ os = new GZIPOutputStream(fos);
+ break;
+ case SNAPPY:
+ os = new SnappyOutputStream(fos);
+ break;
+ case CHECKED:
+ default:
+ os = new BufferedOutputStream(fos);
+ }
+ return new CheckedOutputStream(os, new Adler32());
+ }
+
+ /**
+ * Write specific seal to the OutputArchive and close the OutputStream.
+ * Currently, only CheckedOutputStream will write it's checkSum to the
+ * end of the stream.
+ *
+ */
+ public static void sealStream(CheckedOutputStream os, OutputArchive oa)
+ throws IOException {
+ long val = os.getChecksum().getValue();
+ oa.writeLong(val, "val");
+ oa.writeString("/", "path");
+ }
+
+ /**
+ * Verify the integrity of the seal, only CheckedInputStream will verify
+ * the checkSum of the content.
+ *
+ */
+ static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
+ throws IOException {
+ long checkSum = is.getChecksum().getValue();
+ long val = ia.readLong("val");
+ if (val != checkSum) {
+ throw new IOException("CRC corruption");
+ }
+ }
+
+ /**
+ * Verifies that the file is a valid snapshot. Snapshot may be invalid if
+ * it's incomplete as in a situation when the server dies while in the
+ * process of storing a snapshot. Any files that are improperly formated
+ * or corrupted are invalid. Any file that is not a snapshot is also an
+ * invalid snapshot.
+ *
+ * @param file file to verify
+ * @return true if the snapshot is valid
+ * @throws IOException
+ */
+ public static boolean isValidSnapshot(File file) throws IOException {
+ if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
+ return false;
+ }
+
+ String fileName = file.getName();
+ if (Util.getZxidFromName(fileName, "snapshot") == -1) {
+ return false;
+ }
+
+ boolean isValid = false;
+ switch (getStreamMode(fileName)) {
+ case GZIP:
+ isValid = isValidGZipStream(file);
+ break;
+ case SNAPPY:
+ isValid = isValidSnappyStream(file);
+ break;
+ case CHECKED:
+ default:
+ isValid = isValidCheckedStream(file);
+ }
+ return isValid;
+ }
+
+ public static void setStreamMode(StreamMode mode) {
+ streamMode = mode;
+ }
+
+ public static StreamMode getStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * Detect the stream mode from file name extension
+ *
+ * @param fileName
+ * @return
+ */
+ public static StreamMode getStreamMode(String fileName) {
+ String[] splitSnapName = fileName.split("\\.");
+
+ // Use file extension to detect format
+ if (splitSnapName.length > 1) {
+ String mode = splitSnapName[splitSnapName.length - 1];
+ return StreamMode.fromString(mode);
+ }
+
+ return StreamMode.CHECKED;
+ }
+
+ /**
+ * Certify the GZip stream integrity by checking the header
+ * for the GZip magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct GZip magic string
+ * @throws IOException
+ */
+ private static boolean isValidGZipStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[2];
+
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, 2);
--- End diff --
See https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2623/artifact/build/test/findbugs/newPatchFindbugsWarnings.html
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/zookeeper/pull/690
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2610/
--none----none--
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:
https://github.com/apache/zookeeper/pull/690
@yisong-yue sorry for the spam. There is a problem with the pre-commit job. I won't disturb you anymore on this PR
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:
https://github.com/apache/zookeeper/pull/690
retest this please
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:
https://github.com/apache/zookeeper/pull/690
@anmolnar
One drawback I see with this structure is that it makes it harder to dynamically pick deserialization mode based on a snapshot's filename, since `FileSnap` manages a whole `snapDir` directory now. Maybe we should separate directory management logic from snapshot file (de)serialization in FileSnap class.
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:
https://github.com/apache/zookeeper/pull/690
couldn't reproduce this failure locally. seems like a flakey test case.
retest this please
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:
https://github.com/apache/zookeeper/pull/690
retest this please
---
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/690#discussion_r232363107
--- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyCodec;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Represent the Stream used in serialize and deserialize the Snapshot.
+ */
+public class SnapStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
+
+ public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
+ "zookeeper.snapshot.compression.method";
+
+ private static StreamMode streamMode =
+ StreamMode.fromString(
+ System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
+ StreamMode.DEFAULT_MODE.getName()));
+
+ static {
+ LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
+ }
+
+ public static enum StreamMode {
+ GZIP("gz"),
+ SNAPPY("snappy"),
+ CHECKED("");
+
+ public static final StreamMode DEFAULT_MODE = CHECKED;
+
+ private String name;
+
+ StreamMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFileExtension() {
+ return name.isEmpty() ? "" : "." + name;
+ }
+
+ public static StreamMode fromString(String name) {
+ for (StreamMode c : values()) {
+ if (c.getName().compareToIgnoreCase(name) == 0) {
+ return c;
+ }
+ }
+ return DEFAULT_MODE;
+ }
+ }
+
+ /**
+ * Return the CheckedInputStream based on the extension of the fileName.
+ *
+ * @param fileName the file the InputStream read from
+ * @return the specific InputStream
+ * @throws IOException
+ */
+ public static CheckedInputStream getInputStream(File file) throws IOException {
+ FileInputStream fis = new FileInputStream(file);
+ InputStream is;
+ switch (getStreamMode(file.getName())) {
+ case GZIP:
+ is = new GZIPInputStream(fis);
+ break;
+ case SNAPPY:
+ is = new SnappyInputStream(fis);
+ break;
+ case CHECKED:
+ default:
+ is = new BufferedInputStream(fis);
+ }
+ return new CheckedInputStream(is, new Adler32());
+ }
+
+ /**
+ * Return the OutputStream based on predefined stream mode.
+ *
+ * @param fileName the file the OutputStream writes to
+ * @return the specific OutputStream
+ * @throws IOException
+ */
+ public static CheckedOutputStream getOutputStream(File file) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ OutputStream os;
+ switch (streamMode) {
+ case GZIP:
+ os = new GZIPOutputStream(fos);
+ break;
+ case SNAPPY:
+ os = new SnappyOutputStream(fos);
+ break;
+ case CHECKED:
+ default:
+ os = new BufferedOutputStream(fos);
+ }
+ return new CheckedOutputStream(os, new Adler32());
+ }
+
+ /**
+ * Write specific seal to the OutputArchive and close the OutputStream.
+ * Currently, only CheckedOutputStream will write it's checkSum to the
+ * end of the stream.
+ *
+ */
+ public static void sealStream(CheckedOutputStream os, OutputArchive oa)
+ throws IOException {
+ long val = os.getChecksum().getValue();
+ oa.writeLong(val, "val");
+ oa.writeString("/", "path");
+ }
+
+ /**
+ * Verify the integrity of the seal, only CheckedInputStream will verify
+ * the checkSum of the content.
+ *
+ */
+ static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
+ throws IOException {
+ long checkSum = is.getChecksum().getValue();
+ long val = ia.readLong("val");
+ if (val != checkSum) {
+ throw new IOException("CRC corruption");
+ }
+ }
+
+ /**
+ * Verifies that the file is a valid snapshot. Snapshot may be invalid if
+ * it's incomplete as in a situation when the server dies while in the
+ * process of storing a snapshot. Any files that are improperly formated
+ * or corrupted are invalid. Any file that is not a snapshot is also an
+ * invalid snapshot.
+ *
+ * @param file file to verify
+ * @return true if the snapshot is valid
+ * @throws IOException
+ */
+ public static boolean isValidSnapshot(File file) throws IOException {
+ if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
+ return false;
+ }
+
+ String fileName = file.getName();
+ if (Util.getZxidFromName(fileName, "snapshot") == -1) {
+ return false;
+ }
+
+ boolean isValid = false;
+ switch (getStreamMode(fileName)) {
+ case GZIP:
+ isValid = isValidGZipStream(file);
+ break;
+ case SNAPPY:
+ isValid = isValidSnappyStream(file);
+ break;
+ case CHECKED:
+ default:
+ isValid = isValidCheckedStream(file);
+ }
+ return isValid;
+ }
+
+ public static void setStreamMode(StreamMode mode) {
+ streamMode = mode;
+ }
+
+ public static StreamMode getStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * Detect the stream mode from file name extension
+ *
+ * @param fileName
+ * @return
+ */
+ public static StreamMode getStreamMode(String fileName) {
+ String[] splitSnapName = fileName.split("\\.");
+
+ // Use file extension to detect format
+ if (splitSnapName.length > 1) {
+ String mode = splitSnapName[splitSnapName.length - 1];
+ return StreamMode.fromString(mode);
+ }
+
+ return StreamMode.CHECKED;
+ }
+
+ /**
+ * Certify the GZip stream integrity by checking the header
+ * for the GZip magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct GZip magic string
+ * @throws IOException
+ */
+ private static boolean isValidGZipStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[2];
+
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, 2);
--- End diff --
This is actually a bug to be fixed.
You can for instance use DataInputStream#readFully or use IOUtils if we have it on the classpath
Is this case it is enough to fail if the result is != 2
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:
https://github.com/apache/zookeeper/pull/690
retest this please
---
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/690#discussion_r232325878
--- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyCodec;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Represent the Stream used in serialize and deserialize the Snapshot.
+ */
+public class SnapStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
+
+ public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
+ "zookeeper.snapshot.compression.method";
+
+ private static StreamMode streamMode =
+ StreamMode.fromString(
+ System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
+ StreamMode.DEFAULT_MODE.getName()));
+
+ static {
+ LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
+ }
+
+ public static enum StreamMode {
+ GZIP("gz"),
+ SNAPPY("snappy"),
+ CHECKED("");
+
+ public static final StreamMode DEFAULT_MODE = CHECKED;
+
+ private String name;
+
+ StreamMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFileExtension() {
+ return name.isEmpty() ? "" : "." + name;
+ }
+
+ public static StreamMode fromString(String name) {
+ for (StreamMode c : values()) {
+ if (c.getName().compareToIgnoreCase(name) == 0) {
+ return c;
+ }
+ }
+ return DEFAULT_MODE;
+ }
+ }
+
+ /**
+ * Return the CheckedInputStream based on the extension of the fileName.
+ *
+ * @param fileName the file the InputStream read from
+ * @return the specific InputStream
+ * @throws IOException
+ */
+ public static CheckedInputStream getInputStream(File file) throws IOException {
+ FileInputStream fis = new FileInputStream(file);
+ InputStream is;
+ switch (getStreamMode(file.getName())) {
+ case GZIP:
+ is = new GZIPInputStream(fis);
+ break;
+ case SNAPPY:
+ is = new SnappyInputStream(fis);
+ break;
+ case CHECKED:
+ default:
+ is = new BufferedInputStream(fis);
+ }
+ return new CheckedInputStream(is, new Adler32());
+ }
+
+ /**
+ * Return the OutputStream based on predefined stream mode.
+ *
+ * @param fileName the file the OutputStream writes to
+ * @return the specific OutputStream
+ * @throws IOException
+ */
+ public static CheckedOutputStream getOutputStream(File file) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ OutputStream os;
+ switch (streamMode) {
+ case GZIP:
+ os = new GZIPOutputStream(fos);
+ break;
+ case SNAPPY:
+ os = new SnappyOutputStream(fos);
+ break;
+ case CHECKED:
+ default:
+ os = new BufferedOutputStream(fos);
+ }
+ return new CheckedOutputStream(os, new Adler32());
+ }
+
+ /**
+ * Write specific seal to the OutputArchive and close the OutputStream.
+ * Currently, only CheckedOutputStream will write it's checkSum to the
+ * end of the stream.
+ *
+ */
+ public static void sealStream(CheckedOutputStream os, OutputArchive oa)
+ throws IOException {
+ long val = os.getChecksum().getValue();
+ oa.writeLong(val, "val");
+ oa.writeString("/", "path");
+ }
+
+ /**
+ * Verify the integrity of the seal, only CheckedInputStream will verify
+ * the checkSum of the content.
+ *
+ */
+ static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
+ throws IOException {
+ long checkSum = is.getChecksum().getValue();
+ long val = ia.readLong("val");
+ if (val != checkSum) {
+ throw new IOException("CRC corruption");
+ }
+ }
+
+ /**
+ * Verifies that the file is a valid snapshot. Snapshot may be invalid if
+ * it's incomplete as in a situation when the server dies while in the
+ * process of storing a snapshot. Any files that are improperly formated
+ * or corrupted are invalid. Any file that is not a snapshot is also an
+ * invalid snapshot.
+ *
+ * @param file file to verify
+ * @return true if the snapshot is valid
+ * @throws IOException
+ */
+ public static boolean isValidSnapshot(File file) throws IOException {
+ if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
+ return false;
+ }
+
+ String fileName = file.getName();
+ if (Util.getZxidFromName(fileName, "snapshot") == -1) {
+ return false;
+ }
+
+ boolean isValid = false;
+ switch (getStreamMode(fileName)) {
+ case GZIP:
+ isValid = isValidGZipStream(file);
+ break;
+ case SNAPPY:
+ isValid = isValidSnappyStream(file);
+ break;
+ case CHECKED:
+ default:
+ isValid = isValidCheckedStream(file);
+ }
+ return isValid;
+ }
+
+ public static void setStreamMode(StreamMode mode) {
+ streamMode = mode;
+ }
+
+ public static StreamMode getStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * Detect the stream mode from file name extension
+ *
+ * @param fileName
+ * @return
+ */
+ public static StreamMode getStreamMode(String fileName) {
+ String[] splitSnapName = fileName.split("\\.");
+
+ // Use file extension to detect format
+ if (splitSnapName.length > 1) {
+ String mode = splitSnapName[splitSnapName.length - 1];
+ return StreamMode.fromString(mode);
+ }
+
+ return StreamMode.CHECKED;
+ }
+
+ /**
+ * Certify the GZip stream integrity by checking the header
+ * for the GZip magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct GZip magic string
+ * @throws IOException
+ */
+ private static boolean isValidGZipStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[2];
+
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, 2);
+ ByteBuffer bb = ByteBuffer.wrap(byteArray);
+ byte[] magicHeader = new byte[2];
+ bb.get(magicHeader, 0, 2);
+ int magic = magicHeader[0] & 0xff | ((magicHeader[1] << 8) & 0xff00);
+ return magic == GZIPInputStream.GZIP_MAGIC;
+ } catch (FileNotFoundException e) {
+ LOG.error("Unable to open file " + f.getName() + " : ", e);
+ return false;
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ }
+
+ /**
+ * Certify the Snappy stream integrity by checking the header
+ * for the Snappy magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct Snappy magic string
+ * @throws IOException
+ */
+ private static boolean isValidSnappyStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[SnappyCodec.MAGIC_LEN];
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, SnappyCodec.MAGIC_LEN);
--- End diff --
Again, findBugs doesn't like it. See above.
---
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/690#discussion_r232864632
--- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyCodec;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Represent the Stream used in serialize and deserialize the Snapshot.
+ */
+public class SnapStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
+
+ public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
+ "zookeeper.snapshot.compression.method";
+
+ private static StreamMode streamMode =
+ StreamMode.fromString(
+ System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
+ StreamMode.DEFAULT_MODE.getName()));
+
+ static {
+ LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
+ }
+
+ public static enum StreamMode {
+ GZIP("gz"),
+ SNAPPY("snappy"),
+ CHECKED("");
+
+ public static final StreamMode DEFAULT_MODE = CHECKED;
+
+ private String name;
+
+ StreamMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFileExtension() {
+ return name.isEmpty() ? "" : "." + name;
+ }
+
+ public static StreamMode fromString(String name) {
+ for (StreamMode c : values()) {
+ if (c.getName().compareToIgnoreCase(name) == 0) {
+ return c;
+ }
+ }
+ return DEFAULT_MODE;
+ }
+ }
+
+ /**
+ * Return the CheckedInputStream based on the extension of the fileName.
+ *
+ * @param fileName the file the InputStream read from
+ * @return the specific InputStream
+ * @throws IOException
+ */
+ public static CheckedInputStream getInputStream(File file) throws IOException {
+ FileInputStream fis = new FileInputStream(file);
+ InputStream is;
+ switch (getStreamMode(file.getName())) {
+ case GZIP:
+ is = new GZIPInputStream(fis);
+ break;
+ case SNAPPY:
+ is = new SnappyInputStream(fis);
+ break;
+ case CHECKED:
+ default:
+ is = new BufferedInputStream(fis);
+ }
+ return new CheckedInputStream(is, new Adler32());
+ }
+
+ /**
+ * Return the OutputStream based on predefined stream mode.
+ *
+ * @param fileName the file the OutputStream writes to
+ * @return the specific OutputStream
+ * @throws IOException
+ */
+ public static CheckedOutputStream getOutputStream(File file) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ OutputStream os;
+ switch (streamMode) {
+ case GZIP:
+ os = new GZIPOutputStream(fos);
+ break;
+ case SNAPPY:
+ os = new SnappyOutputStream(fos);
+ break;
+ case CHECKED:
+ default:
+ os = new BufferedOutputStream(fos);
+ }
+ return new CheckedOutputStream(os, new Adler32());
+ }
+
+ /**
+ * Write specific seal to the OutputArchive and close the OutputStream.
+ * Currently, only CheckedOutputStream will write it's checkSum to the
+ * end of the stream.
+ *
+ */
+ public static void sealStream(CheckedOutputStream os, OutputArchive oa)
+ throws IOException {
+ long val = os.getChecksum().getValue();
+ oa.writeLong(val, "val");
+ oa.writeString("/", "path");
+ }
+
+ /**
+ * Verify the integrity of the seal, only CheckedInputStream will verify
+ * the checkSum of the content.
+ *
+ */
+ static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
+ throws IOException {
+ long checkSum = is.getChecksum().getValue();
+ long val = ia.readLong("val");
+ if (val != checkSum) {
+ throw new IOException("CRC corruption");
+ }
+ }
+
+ /**
+ * Verifies that the file is a valid snapshot. Snapshot may be invalid if
+ * it's incomplete as in a situation when the server dies while in the
+ * process of storing a snapshot. Any files that are improperly formated
+ * or corrupted are invalid. Any file that is not a snapshot is also an
+ * invalid snapshot.
+ *
+ * @param file file to verify
+ * @return true if the snapshot is valid
+ * @throws IOException
+ */
+ public static boolean isValidSnapshot(File file) throws IOException {
+ if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
+ return false;
+ }
+
+ String fileName = file.getName();
+ if (Util.getZxidFromName(fileName, "snapshot") == -1) {
+ return false;
+ }
+
+ boolean isValid = false;
+ switch (getStreamMode(fileName)) {
+ case GZIP:
+ isValid = isValidGZipStream(file);
+ break;
+ case SNAPPY:
+ isValid = isValidSnappyStream(file);
+ break;
+ case CHECKED:
+ default:
+ isValid = isValidCheckedStream(file);
+ }
+ return isValid;
+ }
+
+ public static void setStreamMode(StreamMode mode) {
+ streamMode = mode;
+ }
+
+ public static StreamMode getStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * Detect the stream mode from file name extension
+ *
+ * @param fileName
+ * @return
+ */
+ public static StreamMode getStreamMode(String fileName) {
+ String[] splitSnapName = fileName.split("\\.");
+
+ // Use file extension to detect format
+ if (splitSnapName.length > 1) {
+ String mode = splitSnapName[splitSnapName.length - 1];
+ return StreamMode.fromString(mode);
+ }
+
+ return StreamMode.CHECKED;
+ }
+
+ /**
+ * Certify the GZip stream integrity by checking the header
+ * for the GZip magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct GZip magic string
+ * @throws IOException
+ */
+ private static boolean isValidGZipStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[2];
+
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, 2);
--- End diff --
Fixed Findbugs warnings and changed to using try-with-resources!
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:
https://github.com/apache/zookeeper/pull/690
@anmolnar
I agree that concrete classes are a lot easier to test. Though in this case, I think it's more appropriate for SnapStream to be a utility class that does not hold any state. (It contains a global, non-changing "state" `streamMode` now. It's a bit less ideal, but avoids having to pass in the same streamMode value every time.)
For classes that need to hold some mutable state, I agree that concrete class is the way to go.
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/zookeeper/pull/690
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2609/
--none----none--
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:
https://github.com/apache/zookeeper/pull/690
Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2606/
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:
https://github.com/apache/zookeeper/pull/690
@yisong-yue
Yeah, that's true. Perhaps it would be too much hassle and I'm trying to over-engineer things here. Let's just leave it as it is now and only do refactoring if we could benefit from it.
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:
https://github.com/apache/zookeeper/pull/690
@yisong-yue
Global state can still be maintained in a static field of a concrete class, it shouldn't be a problem.
Though, you're probably right about `SnapStream` cannot maintain a state on its own, I believe because the methods should actually belong to `FileSnap`. Taking one step back and looking at how `FileSnap` works, I think - in terms of inheritance - the following would be the best to represent relationships:
```
SnapShot (interface) - represents a single snapshot of the DataTree
|
\\ _ FileSnap (abstract) - implements common methods for all file-based snapshots
|
\\ _ GzipFileSnap (concrete) - Gzip-compressed file-based snapshot
|
\\ _ SnappyFileSnap (concrete) - Snappy compressed file-based snapshot
|
...
```
In this example, your new methods should go into the right concrete class whether it's Gzip, Snappy, etc. related or in the abstract `FileSnap` class if it's some common method.
Of course, this needs to do a bit of a refactor which might not be trivial, so I wouldn't say we have to do this. I just wanted to give you an idea and get some feedback from you and the community.
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:
https://github.com/apache/zookeeper/pull/690
👍 Thanks for the feedback! 😄
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:
https://github.com/apache/zookeeper/pull/690
Here are some benchmark results I did with my laptop:
```
(1)
uncompressed snappy gzip
Size: ~ 15.9 MB ~ 4.1 MB ~ 0.3 MB
Serialize (run 1): 110 ms 36 ms 134 ms
Serialize (run 2): 96 ms 33 ms 135 ms
Deserialize (run 1): 40 ms 25 ms 76 ms
Deserialize (run 2): 39 ms 26 ms 74 ms
(2)
uncompressed snappy gzip
Size: ~ 382 MB ~ 139 MB ~ 104 MB
Serialize (run 1): 3393 ms 3279 ms 22096 ms
Serialize (run 2): 3177 ms 3223 ms 24729 ms
Deserialize (run 1): 4239 ms 3532 ms 14122 ms
Deserialize (run 2): 5750 ms 4722 ms 16654 ms
(3)
uncompressed snappy gzip
Size: ~ 1.2 GB ~ 560 MB ~ 330 MB
Serialize (run 1): 24250 ms 26575 ms 208570 ms
Serialize (run 2): 23717 ms 25648 ms 197400 ms
Deserialize (run 1): 51510 ms 67274 ms 189142 ms
Deserialize (run 2): 50185 ms 64691 ms 181521 ms
```
---
[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...
Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:
https://github.com/apache/zookeeper/pull/690
retest this please
---
[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...
Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/690#discussion_r232325666
--- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.persistence;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyCodec;
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+/**
+ * Represent the Stream used in serialize and deserialize the Snapshot.
+ */
+public class SnapStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
+
+ public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
+ "zookeeper.snapshot.compression.method";
+
+ private static StreamMode streamMode =
+ StreamMode.fromString(
+ System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
+ StreamMode.DEFAULT_MODE.getName()));
+
+ static {
+ LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
+ }
+
+ public static enum StreamMode {
+ GZIP("gz"),
+ SNAPPY("snappy"),
+ CHECKED("");
+
+ public static final StreamMode DEFAULT_MODE = CHECKED;
+
+ private String name;
+
+ StreamMode(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFileExtension() {
+ return name.isEmpty() ? "" : "." + name;
+ }
+
+ public static StreamMode fromString(String name) {
+ for (StreamMode c : values()) {
+ if (c.getName().compareToIgnoreCase(name) == 0) {
+ return c;
+ }
+ }
+ return DEFAULT_MODE;
+ }
+ }
+
+ /**
+ * Return the CheckedInputStream based on the extension of the fileName.
+ *
+ * @param fileName the file the InputStream read from
+ * @return the specific InputStream
+ * @throws IOException
+ */
+ public static CheckedInputStream getInputStream(File file) throws IOException {
+ FileInputStream fis = new FileInputStream(file);
+ InputStream is;
+ switch (getStreamMode(file.getName())) {
+ case GZIP:
+ is = new GZIPInputStream(fis);
+ break;
+ case SNAPPY:
+ is = new SnappyInputStream(fis);
+ break;
+ case CHECKED:
+ default:
+ is = new BufferedInputStream(fis);
+ }
+ return new CheckedInputStream(is, new Adler32());
+ }
+
+ /**
+ * Return the OutputStream based on predefined stream mode.
+ *
+ * @param fileName the file the OutputStream writes to
+ * @return the specific OutputStream
+ * @throws IOException
+ */
+ public static CheckedOutputStream getOutputStream(File file) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ OutputStream os;
+ switch (streamMode) {
+ case GZIP:
+ os = new GZIPOutputStream(fos);
+ break;
+ case SNAPPY:
+ os = new SnappyOutputStream(fos);
+ break;
+ case CHECKED:
+ default:
+ os = new BufferedOutputStream(fos);
+ }
+ return new CheckedOutputStream(os, new Adler32());
+ }
+
+ /**
+ * Write specific seal to the OutputArchive and close the OutputStream.
+ * Currently, only CheckedOutputStream will write it's checkSum to the
+ * end of the stream.
+ *
+ */
+ public static void sealStream(CheckedOutputStream os, OutputArchive oa)
+ throws IOException {
+ long val = os.getChecksum().getValue();
+ oa.writeLong(val, "val");
+ oa.writeString("/", "path");
+ }
+
+ /**
+ * Verify the integrity of the seal, only CheckedInputStream will verify
+ * the checkSum of the content.
+ *
+ */
+ static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
+ throws IOException {
+ long checkSum = is.getChecksum().getValue();
+ long val = ia.readLong("val");
+ if (val != checkSum) {
+ throw new IOException("CRC corruption");
+ }
+ }
+
+ /**
+ * Verifies that the file is a valid snapshot. Snapshot may be invalid if
+ * it's incomplete as in a situation when the server dies while in the
+ * process of storing a snapshot. Any files that are improperly formated
+ * or corrupted are invalid. Any file that is not a snapshot is also an
+ * invalid snapshot.
+ *
+ * @param file file to verify
+ * @return true if the snapshot is valid
+ * @throws IOException
+ */
+ public static boolean isValidSnapshot(File file) throws IOException {
+ if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
+ return false;
+ }
+
+ String fileName = file.getName();
+ if (Util.getZxidFromName(fileName, "snapshot") == -1) {
+ return false;
+ }
+
+ boolean isValid = false;
+ switch (getStreamMode(fileName)) {
+ case GZIP:
+ isValid = isValidGZipStream(file);
+ break;
+ case SNAPPY:
+ isValid = isValidSnappyStream(file);
+ break;
+ case CHECKED:
+ default:
+ isValid = isValidCheckedStream(file);
+ }
+ return isValid;
+ }
+
+ public static void setStreamMode(StreamMode mode) {
+ streamMode = mode;
+ }
+
+ public static StreamMode getStreamMode() {
+ return streamMode;
+ }
+
+ /**
+ * Detect the stream mode from file name extension
+ *
+ * @param fileName
+ * @return
+ */
+ public static StreamMode getStreamMode(String fileName) {
+ String[] splitSnapName = fileName.split("\\.");
+
+ // Use file extension to detect format
+ if (splitSnapName.length > 1) {
+ String mode = splitSnapName[splitSnapName.length - 1];
+ return StreamMode.fromString(mode);
+ }
+
+ return StreamMode.CHECKED;
+ }
+
+ /**
+ * Certify the GZip stream integrity by checking the header
+ * for the GZip magic string
+ *
+ * @param f file to verify
+ * @return true if it has the correct GZip magic string
+ * @throws IOException
+ */
+ private static boolean isValidGZipStream(File f) throws IOException {
+ FileInputStream fis = null;
+ byte[] byteArray = new byte[2];
+
+ try {
+ fis = new FileInputStream(f);
+ fis.read(byteArray, 0, 2);
--- End diff --
Findbugs reports this, as you ignore the result of read().
Please add it to findbugsExcludeFile.xml to ignore it (or you can use return value to check if the read was successful here, not necessary though).
---