You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zookeeper.apache.org by yisong-yue <gi...@git.apache.org> on 2018/11/07 07:28:21 UTC

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

GitHub user yisong-yue opened a pull request:

    https://github.com/apache/zookeeper/pull/690

     ZOOKEEPER-3179: Add snapshot compression to reduce the disk IO

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yisong-yue/zookeeper ZOOKEEPER-3179

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/zookeeper/pull/690.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #690
    
----
commit 25ead63f620b99571850fdd7f827d685c79e3214
Author: Yisong Yue <yi...@...>
Date:   2018-11-07T05:04:32Z

    ZOOKEEPER-3179: Add snapshot compression to reduce the disk IO

commit 9fd3071e1487044f77172765222fe6f520961946
Author: Yisong Yue <yi...@...>
Date:   2018-11-07T07:24:34Z

    small improvements

----


---

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/690#discussion_r232363306
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.persistence;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.RandomAccessFile;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.zip.Adler32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.CheckedOutputStream;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import org.apache.jute.InputArchive;
    +import org.apache.jute.OutputArchive;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.xerial.snappy.SnappyCodec;
    +import org.xerial.snappy.SnappyInputStream;
    +import org.xerial.snappy.SnappyOutputStream;
    +
    +/**
    + * Represent the Stream used in serialize and deserialize the Snapshot.
    + */
    +public class SnapStream {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
    +
    +    public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
    +        "zookeeper.snapshot.compression.method";
    +
    +    private static StreamMode streamMode =
    +        StreamMode.fromString(
    +            System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
    +                  StreamMode.DEFAULT_MODE.getName()));
    +
    +    static {
    +        LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
    +    }
    +
    +    public static enum StreamMode {
    +        GZIP("gz"),
    +        SNAPPY("snappy"),
    +        CHECKED("");
    +
    +        public static final StreamMode DEFAULT_MODE = CHECKED;
    +
    +        private String name;
    +
    +        StreamMode(String name) {
    +           this.name = name;
    +        }
    +
    +        public String getName() {
    +            return name;
    +        }
    +
    +        public String getFileExtension() {
    +            return name.isEmpty() ? "" : "." + name;
    +        }
    +
    +        public static StreamMode fromString(String name) {
    +            for (StreamMode c : values()) {
    +                if (c.getName().compareToIgnoreCase(name) == 0) {
    +                    return c;
    +                }
    +            }
    +            return DEFAULT_MODE;
    +        }
    +    }
    +
    +    /**
    +     * Return the CheckedInputStream based on the extension of the fileName.
    +     *
    +     * @param fileName the file the InputStream read from
    +     * @return the specific InputStream
    +     * @throws IOException
    +     */
    +    public static CheckedInputStream getInputStream(File file) throws IOException {
    +        FileInputStream fis = new FileInputStream(file);
    +        InputStream is;
    +        switch (getStreamMode(file.getName())) {
    +            case GZIP:
    +                is = new GZIPInputStream(fis);
    +                break;
    +            case SNAPPY:
    +                is = new SnappyInputStream(fis);
    +                break;
    +            case CHECKED:
    +            default:
    +                is = new BufferedInputStream(fis);
    +        }
    +        return new CheckedInputStream(is, new Adler32());
    +    }
    +
    +    /**
    +     * Return the OutputStream based on predefined stream mode.
    +     *
    +     * @param fileName the file the OutputStream writes to
    +     * @return the specific OutputStream
    +     * @throws IOException
    +     */
    +    public static CheckedOutputStream getOutputStream(File file) throws IOException {
    +        FileOutputStream fos = new FileOutputStream(file);
    +        OutputStream os;
    +        switch (streamMode) {
    +            case GZIP:
    +                os = new GZIPOutputStream(fos);
    +                break;
    +            case SNAPPY:
    +                os = new SnappyOutputStream(fos);
    +                break;
    +            case CHECKED:
    +            default:
    +                os = new BufferedOutputStream(fos);
    +        }
    +        return new CheckedOutputStream(os, new Adler32());
    +    }
    +
    +    /**
    +     * Write specific seal to the OutputArchive and close the OutputStream.
    +     * Currently, only CheckedOutputStream will write it's checkSum to the
    +     * end of the stream.
    +     *
    +     */
    +    public static void sealStream(CheckedOutputStream os, OutputArchive oa)
    +            throws IOException {
    +        long val = os.getChecksum().getValue();
    +        oa.writeLong(val, "val");
    +        oa.writeString("/", "path");
    +    }
    +
    +    /**
    +     * Verify the integrity of the seal, only CheckedInputStream will verify
    +     * the checkSum of the content.
    +     *
    +     */
    +    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
    +            throws IOException {
    +        long checkSum = is.getChecksum().getValue();
    +        long val = ia.readLong("val");
    +        if (val != checkSum) {
    +            throw new IOException("CRC corruption");
    +        }
    +    }
    +
    +    /**
    +     * Verifies that the file is a valid snapshot. Snapshot may be invalid if
    +     * it's incomplete as in a situation when the server dies while in the
    +     * process of storing a snapshot. Any files that are improperly formated
    +     * or corrupted are invalid. Any file that is not a snapshot is also an
    +     * invalid snapshot.
    +     *
    +     * @param file file to verify
    +     * @return true if the snapshot is valid
    +     * @throws IOException
    +     */
    +    public static boolean isValidSnapshot(File file) throws IOException {
    +        if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
    +            return false;
    +        }
    +
    +        String fileName = file.getName();
    +        if (Util.getZxidFromName(fileName, "snapshot") == -1) {
    +            return false;
    +        }
    +
    +        boolean isValid = false;
    +        switch (getStreamMode(fileName)) {
    +            case GZIP:
    +                isValid = isValidGZipStream(file);
    +                break;
    +            case SNAPPY:
    +                isValid = isValidSnappyStream(file);
    +                break;
    +            case CHECKED:
    +            default:
    +                isValid = isValidCheckedStream(file);
    +        }
    +        return isValid;
    +    }
    +
    +    public static void setStreamMode(StreamMode mode) {
    +        streamMode = mode;
    +    }
    +
    +    public static StreamMode getStreamMode() {
    +        return streamMode;
    +    }
    +
    +    /**
    +     * Detect the stream mode from file name extension
    +     *
    +     * @param fileName
    +     * @return
    +     */
    +    public static StreamMode getStreamMode(String fileName) {
    +        String[] splitSnapName = fileName.split("\\.");
    +
    +        // Use file extension to detect format
    +        if (splitSnapName.length > 1) {
    +            String mode = splitSnapName[splitSnapName.length - 1];
    +            return StreamMode.fromString(mode);
    +        }
    +
    +        return StreamMode.CHECKED;
    +    }
    +
    +    /**
    +     * Certify the GZip stream integrity by checking the header
    +     * for the GZip magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct GZip magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidGZipStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[2];
    +
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, 2);
    --- End diff --
    
    We can also use try-with-resources


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2608/



---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    squashed two commits into one.


---

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/690#discussion_r232326140
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.persistence;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.RandomAccessFile;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.zip.Adler32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.CheckedOutputStream;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import org.apache.jute.InputArchive;
    +import org.apache.jute.OutputArchive;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.xerial.snappy.SnappyCodec;
    +import org.xerial.snappy.SnappyInputStream;
    +import org.xerial.snappy.SnappyOutputStream;
    +
    +/**
    + * Represent the Stream used in serialize and deserialize the Snapshot.
    + */
    +public class SnapStream {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
    +
    +    public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
    +        "zookeeper.snapshot.compression.method";
    +
    +    private static StreamMode streamMode =
    +        StreamMode.fromString(
    +            System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
    +                  StreamMode.DEFAULT_MODE.getName()));
    +
    +    static {
    +        LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
    +    }
    +
    +    public static enum StreamMode {
    +        GZIP("gz"),
    +        SNAPPY("snappy"),
    +        CHECKED("");
    +
    +        public static final StreamMode DEFAULT_MODE = CHECKED;
    +
    +        private String name;
    +
    +        StreamMode(String name) {
    +           this.name = name;
    +        }
    +
    +        public String getName() {
    +            return name;
    +        }
    +
    +        public String getFileExtension() {
    +            return name.isEmpty() ? "" : "." + name;
    +        }
    +
    +        public static StreamMode fromString(String name) {
    +            for (StreamMode c : values()) {
    +                if (c.getName().compareToIgnoreCase(name) == 0) {
    +                    return c;
    +                }
    +            }
    +            return DEFAULT_MODE;
    +        }
    +    }
    +
    +    /**
    +     * Return the CheckedInputStream based on the extension of the fileName.
    +     *
    +     * @param fileName the file the InputStream read from
    +     * @return the specific InputStream
    +     * @throws IOException
    +     */
    +    public static CheckedInputStream getInputStream(File file) throws IOException {
    +        FileInputStream fis = new FileInputStream(file);
    +        InputStream is;
    +        switch (getStreamMode(file.getName())) {
    +            case GZIP:
    +                is = new GZIPInputStream(fis);
    +                break;
    +            case SNAPPY:
    +                is = new SnappyInputStream(fis);
    +                break;
    +            case CHECKED:
    +            default:
    +                is = new BufferedInputStream(fis);
    +        }
    +        return new CheckedInputStream(is, new Adler32());
    +    }
    +
    +    /**
    +     * Return the OutputStream based on predefined stream mode.
    +     *
    +     * @param fileName the file the OutputStream writes to
    +     * @return the specific OutputStream
    +     * @throws IOException
    +     */
    +    public static CheckedOutputStream getOutputStream(File file) throws IOException {
    +        FileOutputStream fos = new FileOutputStream(file);
    +        OutputStream os;
    +        switch (streamMode) {
    +            case GZIP:
    +                os = new GZIPOutputStream(fos);
    +                break;
    +            case SNAPPY:
    +                os = new SnappyOutputStream(fos);
    +                break;
    +            case CHECKED:
    +            default:
    +                os = new BufferedOutputStream(fos);
    +        }
    +        return new CheckedOutputStream(os, new Adler32());
    +    }
    +
    +    /**
    +     * Write specific seal to the OutputArchive and close the OutputStream.
    +     * Currently, only CheckedOutputStream will write it's checkSum to the
    +     * end of the stream.
    +     *
    +     */
    +    public static void sealStream(CheckedOutputStream os, OutputArchive oa)
    +            throws IOException {
    +        long val = os.getChecksum().getValue();
    +        oa.writeLong(val, "val");
    +        oa.writeString("/", "path");
    +    }
    +
    +    /**
    +     * Verify the integrity of the seal, only CheckedInputStream will verify
    +     * the checkSum of the content.
    +     *
    +     */
    +    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
    +            throws IOException {
    +        long checkSum = is.getChecksum().getValue();
    +        long val = ia.readLong("val");
    +        if (val != checkSum) {
    +            throw new IOException("CRC corruption");
    +        }
    +    }
    +
    +    /**
    +     * Verifies that the file is a valid snapshot. Snapshot may be invalid if
    +     * it's incomplete as in a situation when the server dies while in the
    +     * process of storing a snapshot. Any files that are improperly formated
    +     * or corrupted are invalid. Any file that is not a snapshot is also an
    +     * invalid snapshot.
    +     *
    +     * @param file file to verify
    +     * @return true if the snapshot is valid
    +     * @throws IOException
    +     */
    +    public static boolean isValidSnapshot(File file) throws IOException {
    +        if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
    +            return false;
    +        }
    +
    +        String fileName = file.getName();
    +        if (Util.getZxidFromName(fileName, "snapshot") == -1) {
    +            return false;
    +        }
    +
    +        boolean isValid = false;
    +        switch (getStreamMode(fileName)) {
    +            case GZIP:
    +                isValid = isValidGZipStream(file);
    +                break;
    +            case SNAPPY:
    +                isValid = isValidSnappyStream(file);
    +                break;
    +            case CHECKED:
    +            default:
    +                isValid = isValidCheckedStream(file);
    +        }
    +        return isValid;
    +    }
    +
    +    public static void setStreamMode(StreamMode mode) {
    +        streamMode = mode;
    +    }
    +
    +    public static StreamMode getStreamMode() {
    +        return streamMode;
    +    }
    +
    +    /**
    +     * Detect the stream mode from file name extension
    +     *
    +     * @param fileName
    +     * @return
    +     */
    +    public static StreamMode getStreamMode(String fileName) {
    +        String[] splitSnapName = fileName.split("\\.");
    +
    +        // Use file extension to detect format
    +        if (splitSnapName.length > 1) {
    +            String mode = splitSnapName[splitSnapName.length - 1];
    +            return StreamMode.fromString(mode);
    +        }
    +
    +        return StreamMode.CHECKED;
    +    }
    +
    +    /**
    +     * Certify the GZip stream integrity by checking the header
    +     * for the GZip magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct GZip magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidGZipStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[2];
    +
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, 2);
    --- End diff --
    
    See https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2623/artifact/build/test/findbugs/newPatchFindbugsWarnings.html 


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2610/
    --none----none--


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    @yisong-yue sorry for the spam. There is a problem with the pre-commit job. I won't disturb you anymore on this PR


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by lvfangmin <gi...@git.apache.org>.
Github user lvfangmin commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    retest this please


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    @anmolnar 
    One drawback I see with this structure is that it makes it harder to dynamically pick deserialization mode based on a snapshot's filename, since `FileSnap` manages a whole `snapDir` directory now. Maybe we should separate directory management logic from snapshot file (de)serialization in FileSnap class.


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    couldn't reproduce this failure locally. seems like a flakey test case.
    retest this please


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    retest this please


---

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/690#discussion_r232363107
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.persistence;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.RandomAccessFile;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.zip.Adler32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.CheckedOutputStream;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import org.apache.jute.InputArchive;
    +import org.apache.jute.OutputArchive;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.xerial.snappy.SnappyCodec;
    +import org.xerial.snappy.SnappyInputStream;
    +import org.xerial.snappy.SnappyOutputStream;
    +
    +/**
    + * Represent the Stream used in serialize and deserialize the Snapshot.
    + */
    +public class SnapStream {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
    +
    +    public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
    +        "zookeeper.snapshot.compression.method";
    +
    +    private static StreamMode streamMode =
    +        StreamMode.fromString(
    +            System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
    +                  StreamMode.DEFAULT_MODE.getName()));
    +
    +    static {
    +        LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
    +    }
    +
    +    public static enum StreamMode {
    +        GZIP("gz"),
    +        SNAPPY("snappy"),
    +        CHECKED("");
    +
    +        public static final StreamMode DEFAULT_MODE = CHECKED;
    +
    +        private String name;
    +
    +        StreamMode(String name) {
    +           this.name = name;
    +        }
    +
    +        public String getName() {
    +            return name;
    +        }
    +
    +        public String getFileExtension() {
    +            return name.isEmpty() ? "" : "." + name;
    +        }
    +
    +        public static StreamMode fromString(String name) {
    +            for (StreamMode c : values()) {
    +                if (c.getName().compareToIgnoreCase(name) == 0) {
    +                    return c;
    +                }
    +            }
    +            return DEFAULT_MODE;
    +        }
    +    }
    +
    +    /**
    +     * Return the CheckedInputStream based on the extension of the fileName.
    +     *
    +     * @param fileName the file the InputStream read from
    +     * @return the specific InputStream
    +     * @throws IOException
    +     */
    +    public static CheckedInputStream getInputStream(File file) throws IOException {
    +        FileInputStream fis = new FileInputStream(file);
    +        InputStream is;
    +        switch (getStreamMode(file.getName())) {
    +            case GZIP:
    +                is = new GZIPInputStream(fis);
    +                break;
    +            case SNAPPY:
    +                is = new SnappyInputStream(fis);
    +                break;
    +            case CHECKED:
    +            default:
    +                is = new BufferedInputStream(fis);
    +        }
    +        return new CheckedInputStream(is, new Adler32());
    +    }
    +
    +    /**
    +     * Return the OutputStream based on predefined stream mode.
    +     *
    +     * @param fileName the file the OutputStream writes to
    +     * @return the specific OutputStream
    +     * @throws IOException
    +     */
    +    public static CheckedOutputStream getOutputStream(File file) throws IOException {
    +        FileOutputStream fos = new FileOutputStream(file);
    +        OutputStream os;
    +        switch (streamMode) {
    +            case GZIP:
    +                os = new GZIPOutputStream(fos);
    +                break;
    +            case SNAPPY:
    +                os = new SnappyOutputStream(fos);
    +                break;
    +            case CHECKED:
    +            default:
    +                os = new BufferedOutputStream(fos);
    +        }
    +        return new CheckedOutputStream(os, new Adler32());
    +    }
    +
    +    /**
    +     * Write specific seal to the OutputArchive and close the OutputStream.
    +     * Currently, only CheckedOutputStream will write it's checkSum to the
    +     * end of the stream.
    +     *
    +     */
    +    public static void sealStream(CheckedOutputStream os, OutputArchive oa)
    +            throws IOException {
    +        long val = os.getChecksum().getValue();
    +        oa.writeLong(val, "val");
    +        oa.writeString("/", "path");
    +    }
    +
    +    /**
    +     * Verify the integrity of the seal, only CheckedInputStream will verify
    +     * the checkSum of the content.
    +     *
    +     */
    +    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
    +            throws IOException {
    +        long checkSum = is.getChecksum().getValue();
    +        long val = ia.readLong("val");
    +        if (val != checkSum) {
    +            throw new IOException("CRC corruption");
    +        }
    +    }
    +
    +    /**
    +     * Verifies that the file is a valid snapshot. Snapshot may be invalid if
    +     * it's incomplete as in a situation when the server dies while in the
    +     * process of storing a snapshot. Any files that are improperly formated
    +     * or corrupted are invalid. Any file that is not a snapshot is also an
    +     * invalid snapshot.
    +     *
    +     * @param file file to verify
    +     * @return true if the snapshot is valid
    +     * @throws IOException
    +     */
    +    public static boolean isValidSnapshot(File file) throws IOException {
    +        if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
    +            return false;
    +        }
    +
    +        String fileName = file.getName();
    +        if (Util.getZxidFromName(fileName, "snapshot") == -1) {
    +            return false;
    +        }
    +
    +        boolean isValid = false;
    +        switch (getStreamMode(fileName)) {
    +            case GZIP:
    +                isValid = isValidGZipStream(file);
    +                break;
    +            case SNAPPY:
    +                isValid = isValidSnappyStream(file);
    +                break;
    +            case CHECKED:
    +            default:
    +                isValid = isValidCheckedStream(file);
    +        }
    +        return isValid;
    +    }
    +
    +    public static void setStreamMode(StreamMode mode) {
    +        streamMode = mode;
    +    }
    +
    +    public static StreamMode getStreamMode() {
    +        return streamMode;
    +    }
    +
    +    /**
    +     * Detect the stream mode from file name extension
    +     *
    +     * @param fileName
    +     * @return
    +     */
    +    public static StreamMode getStreamMode(String fileName) {
    +        String[] splitSnapName = fileName.split("\\.");
    +
    +        // Use file extension to detect format
    +        if (splitSnapName.length > 1) {
    +            String mode = splitSnapName[splitSnapName.length - 1];
    +            return StreamMode.fromString(mode);
    +        }
    +
    +        return StreamMode.CHECKED;
    +    }
    +
    +    /**
    +     * Certify the GZip stream integrity by checking the header
    +     * for the GZip magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct GZip magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidGZipStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[2];
    +
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, 2);
    --- End diff --
    
    This is actually a bug to be fixed.
    You can for instance use DataInputStream#readFully or use IOUtils if we have it on the classpath
    
    Is this case it is enough to fail if the result is != 2


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    retest this please


---

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/690#discussion_r232325878
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.persistence;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.RandomAccessFile;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.zip.Adler32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.CheckedOutputStream;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import org.apache.jute.InputArchive;
    +import org.apache.jute.OutputArchive;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.xerial.snappy.SnappyCodec;
    +import org.xerial.snappy.SnappyInputStream;
    +import org.xerial.snappy.SnappyOutputStream;
    +
    +/**
    + * Represent the Stream used in serialize and deserialize the Snapshot.
    + */
    +public class SnapStream {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
    +
    +    public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
    +        "zookeeper.snapshot.compression.method";
    +
    +    private static StreamMode streamMode =
    +        StreamMode.fromString(
    +            System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
    +                  StreamMode.DEFAULT_MODE.getName()));
    +
    +    static {
    +        LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
    +    }
    +
    +    public static enum StreamMode {
    +        GZIP("gz"),
    +        SNAPPY("snappy"),
    +        CHECKED("");
    +
    +        public static final StreamMode DEFAULT_MODE = CHECKED;
    +
    +        private String name;
    +
    +        StreamMode(String name) {
    +           this.name = name;
    +        }
    +
    +        public String getName() {
    +            return name;
    +        }
    +
    +        public String getFileExtension() {
    +            return name.isEmpty() ? "" : "." + name;
    +        }
    +
    +        public static StreamMode fromString(String name) {
    +            for (StreamMode c : values()) {
    +                if (c.getName().compareToIgnoreCase(name) == 0) {
    +                    return c;
    +                }
    +            }
    +            return DEFAULT_MODE;
    +        }
    +    }
    +
    +    /**
    +     * Return the CheckedInputStream based on the extension of the fileName.
    +     *
    +     * @param fileName the file the InputStream read from
    +     * @return the specific InputStream
    +     * @throws IOException
    +     */
    +    public static CheckedInputStream getInputStream(File file) throws IOException {
    +        FileInputStream fis = new FileInputStream(file);
    +        InputStream is;
    +        switch (getStreamMode(file.getName())) {
    +            case GZIP:
    +                is = new GZIPInputStream(fis);
    +                break;
    +            case SNAPPY:
    +                is = new SnappyInputStream(fis);
    +                break;
    +            case CHECKED:
    +            default:
    +                is = new BufferedInputStream(fis);
    +        }
    +        return new CheckedInputStream(is, new Adler32());
    +    }
    +
    +    /**
    +     * Return the OutputStream based on predefined stream mode.
    +     *
    +     * @param fileName the file the OutputStream writes to
    +     * @return the specific OutputStream
    +     * @throws IOException
    +     */
    +    public static CheckedOutputStream getOutputStream(File file) throws IOException {
    +        FileOutputStream fos = new FileOutputStream(file);
    +        OutputStream os;
    +        switch (streamMode) {
    +            case GZIP:
    +                os = new GZIPOutputStream(fos);
    +                break;
    +            case SNAPPY:
    +                os = new SnappyOutputStream(fos);
    +                break;
    +            case CHECKED:
    +            default:
    +                os = new BufferedOutputStream(fos);
    +        }
    +        return new CheckedOutputStream(os, new Adler32());
    +    }
    +
    +    /**
    +     * Write specific seal to the OutputArchive and close the OutputStream.
    +     * Currently, only CheckedOutputStream will write it's checkSum to the
    +     * end of the stream.
    +     *
    +     */
    +    public static void sealStream(CheckedOutputStream os, OutputArchive oa)
    +            throws IOException {
    +        long val = os.getChecksum().getValue();
    +        oa.writeLong(val, "val");
    +        oa.writeString("/", "path");
    +    }
    +
    +    /**
    +     * Verify the integrity of the seal, only CheckedInputStream will verify
    +     * the checkSum of the content.
    +     *
    +     */
    +    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
    +            throws IOException {
    +        long checkSum = is.getChecksum().getValue();
    +        long val = ia.readLong("val");
    +        if (val != checkSum) {
    +            throw new IOException("CRC corruption");
    +        }
    +    }
    +
    +    /**
    +     * Verifies that the file is a valid snapshot. Snapshot may be invalid if
    +     * it's incomplete as in a situation when the server dies while in the
    +     * process of storing a snapshot. Any files that are improperly formated
    +     * or corrupted are invalid. Any file that is not a snapshot is also an
    +     * invalid snapshot.
    +     *
    +     * @param file file to verify
    +     * @return true if the snapshot is valid
    +     * @throws IOException
    +     */
    +    public static boolean isValidSnapshot(File file) throws IOException {
    +        if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
    +            return false;
    +        }
    +
    +        String fileName = file.getName();
    +        if (Util.getZxidFromName(fileName, "snapshot") == -1) {
    +            return false;
    +        }
    +
    +        boolean isValid = false;
    +        switch (getStreamMode(fileName)) {
    +            case GZIP:
    +                isValid = isValidGZipStream(file);
    +                break;
    +            case SNAPPY:
    +                isValid = isValidSnappyStream(file);
    +                break;
    +            case CHECKED:
    +            default:
    +                isValid = isValidCheckedStream(file);
    +        }
    +        return isValid;
    +    }
    +
    +    public static void setStreamMode(StreamMode mode) {
    +        streamMode = mode;
    +    }
    +
    +    public static StreamMode getStreamMode() {
    +        return streamMode;
    +    }
    +
    +    /**
    +     * Detect the stream mode from file name extension
    +     *
    +     * @param fileName
    +     * @return
    +     */
    +    public static StreamMode getStreamMode(String fileName) {
    +        String[] splitSnapName = fileName.split("\\.");
    +
    +        // Use file extension to detect format
    +        if (splitSnapName.length > 1) {
    +            String mode = splitSnapName[splitSnapName.length - 1];
    +            return StreamMode.fromString(mode);
    +        }
    +
    +        return StreamMode.CHECKED;
    +    }
    +
    +    /**
    +     * Certify the GZip stream integrity by checking the header
    +     * for the GZip magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct GZip magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidGZipStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[2];
    +
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, 2);
    +            ByteBuffer bb = ByteBuffer.wrap(byteArray);
    +            byte[] magicHeader = new byte[2];
    +            bb.get(magicHeader, 0, 2);
    +            int magic = magicHeader[0] & 0xff | ((magicHeader[1] << 8) & 0xff00);
    +            return magic == GZIPInputStream.GZIP_MAGIC;
    +        } catch (FileNotFoundException e) {
    +            LOG.error("Unable to open file " + f.getName() + " : ", e);
    +            return false;
    +        } finally {
    +            if (fis != null) {
    +                fis.close();
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Certify the Snappy stream integrity by checking the header
    +     * for the Snappy magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct Snappy magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidSnappyStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[SnappyCodec.MAGIC_LEN];
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, SnappyCodec.MAGIC_LEN);
    --- End diff --
    
    Again, findBugs doesn't like it. See above.


---

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/690#discussion_r232864632
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.persistence;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.RandomAccessFile;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.zip.Adler32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.CheckedOutputStream;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import org.apache.jute.InputArchive;
    +import org.apache.jute.OutputArchive;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.xerial.snappy.SnappyCodec;
    +import org.xerial.snappy.SnappyInputStream;
    +import org.xerial.snappy.SnappyOutputStream;
    +
    +/**
    + * Represent the Stream used in serialize and deserialize the Snapshot.
    + */
    +public class SnapStream {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
    +
    +    public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
    +        "zookeeper.snapshot.compression.method";
    +
    +    private static StreamMode streamMode =
    +        StreamMode.fromString(
    +            System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
    +                  StreamMode.DEFAULT_MODE.getName()));
    +
    +    static {
    +        LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
    +    }
    +
    +    public static enum StreamMode {
    +        GZIP("gz"),
    +        SNAPPY("snappy"),
    +        CHECKED("");
    +
    +        public static final StreamMode DEFAULT_MODE = CHECKED;
    +
    +        private String name;
    +
    +        StreamMode(String name) {
    +           this.name = name;
    +        }
    +
    +        public String getName() {
    +            return name;
    +        }
    +
    +        public String getFileExtension() {
    +            return name.isEmpty() ? "" : "." + name;
    +        }
    +
    +        public static StreamMode fromString(String name) {
    +            for (StreamMode c : values()) {
    +                if (c.getName().compareToIgnoreCase(name) == 0) {
    +                    return c;
    +                }
    +            }
    +            return DEFAULT_MODE;
    +        }
    +    }
    +
    +    /**
    +     * Return the CheckedInputStream based on the extension of the fileName.
    +     *
    +     * @param fileName the file the InputStream read from
    +     * @return the specific InputStream
    +     * @throws IOException
    +     */
    +    public static CheckedInputStream getInputStream(File file) throws IOException {
    +        FileInputStream fis = new FileInputStream(file);
    +        InputStream is;
    +        switch (getStreamMode(file.getName())) {
    +            case GZIP:
    +                is = new GZIPInputStream(fis);
    +                break;
    +            case SNAPPY:
    +                is = new SnappyInputStream(fis);
    +                break;
    +            case CHECKED:
    +            default:
    +                is = new BufferedInputStream(fis);
    +        }
    +        return new CheckedInputStream(is, new Adler32());
    +    }
    +
    +    /**
    +     * Return the OutputStream based on predefined stream mode.
    +     *
    +     * @param fileName the file the OutputStream writes to
    +     * @return the specific OutputStream
    +     * @throws IOException
    +     */
    +    public static CheckedOutputStream getOutputStream(File file) throws IOException {
    +        FileOutputStream fos = new FileOutputStream(file);
    +        OutputStream os;
    +        switch (streamMode) {
    +            case GZIP:
    +                os = new GZIPOutputStream(fos);
    +                break;
    +            case SNAPPY:
    +                os = new SnappyOutputStream(fos);
    +                break;
    +            case CHECKED:
    +            default:
    +                os = new BufferedOutputStream(fos);
    +        }
    +        return new CheckedOutputStream(os, new Adler32());
    +    }
    +
    +    /**
    +     * Write specific seal to the OutputArchive and close the OutputStream.
    +     * Currently, only CheckedOutputStream will write it's checkSum to the
    +     * end of the stream.
    +     *
    +     */
    +    public static void sealStream(CheckedOutputStream os, OutputArchive oa)
    +            throws IOException {
    +        long val = os.getChecksum().getValue();
    +        oa.writeLong(val, "val");
    +        oa.writeString("/", "path");
    +    }
    +
    +    /**
    +     * Verify the integrity of the seal, only CheckedInputStream will verify
    +     * the checkSum of the content.
    +     *
    +     */
    +    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
    +            throws IOException {
    +        long checkSum = is.getChecksum().getValue();
    +        long val = ia.readLong("val");
    +        if (val != checkSum) {
    +            throw new IOException("CRC corruption");
    +        }
    +    }
    +
    +    /**
    +     * Verifies that the file is a valid snapshot. Snapshot may be invalid if
    +     * it's incomplete as in a situation when the server dies while in the
    +     * process of storing a snapshot. Any files that are improperly formated
    +     * or corrupted are invalid. Any file that is not a snapshot is also an
    +     * invalid snapshot.
    +     *
    +     * @param file file to verify
    +     * @return true if the snapshot is valid
    +     * @throws IOException
    +     */
    +    public static boolean isValidSnapshot(File file) throws IOException {
    +        if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
    +            return false;
    +        }
    +
    +        String fileName = file.getName();
    +        if (Util.getZxidFromName(fileName, "snapshot") == -1) {
    +            return false;
    +        }
    +
    +        boolean isValid = false;
    +        switch (getStreamMode(fileName)) {
    +            case GZIP:
    +                isValid = isValidGZipStream(file);
    +                break;
    +            case SNAPPY:
    +                isValid = isValidSnappyStream(file);
    +                break;
    +            case CHECKED:
    +            default:
    +                isValid = isValidCheckedStream(file);
    +        }
    +        return isValid;
    +    }
    +
    +    public static void setStreamMode(StreamMode mode) {
    +        streamMode = mode;
    +    }
    +
    +    public static StreamMode getStreamMode() {
    +        return streamMode;
    +    }
    +
    +    /**
    +     * Detect the stream mode from file name extension
    +     *
    +     * @param fileName
    +     * @return
    +     */
    +    public static StreamMode getStreamMode(String fileName) {
    +        String[] splitSnapName = fileName.split("\\.");
    +
    +        // Use file extension to detect format
    +        if (splitSnapName.length > 1) {
    +            String mode = splitSnapName[splitSnapName.length - 1];
    +            return StreamMode.fromString(mode);
    +        }
    +
    +        return StreamMode.CHECKED;
    +    }
    +
    +    /**
    +     * Certify the GZip stream integrity by checking the header
    +     * for the GZip magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct GZip magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidGZipStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[2];
    +
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, 2);
    --- End diff --
    
    Fixed Findbugs warnings and changed to using try-with-resources!


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    @anmolnar 
    I agree that concrete classes are a lot easier to test. Though in this case, I think it's more appropriate for SnapStream to be a utility class that does not hold any state. (It contains a global, non-changing "state" `streamMode` now. It's a bit less ideal, but avoids having to pass in the same streamMode value every time.)
    For classes that need to hold some mutable state, I agree that concrete class is the way to go.


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2609/
    --none----none--


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/PreCommit-ZOOKEEPER-github-pr-build/2606/



---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    @yisong-yue 
    Yeah, that's true. Perhaps it would be too much hassle and I'm trying to over-engineer things here. Let's just leave it as it is now and only do refactoring if we could benefit from it.


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by anmolnar <gi...@git.apache.org>.
Github user anmolnar commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    @yisong-yue 
    Global state can still be maintained in a static field of a concrete class, it shouldn't be a problem.
    
    Though, you're probably right about `SnapStream` cannot maintain a state on its own, I believe because the methods should actually belong to `FileSnap`. Taking one step back and looking at how `FileSnap` works, I think - in terms of inheritance - the following would be the best to represent relationships:
    
    ```
    SnapShot (interface) - represents a single snapshot of the DataTree
    |
    \\ _ FileSnap (abstract) - implements common methods for all file-based snapshots
        |
        \\ _ GzipFileSnap (concrete) - Gzip-compressed file-based snapshot
        |
        \\ _ SnappyFileSnap (concrete) - Snappy compressed file-based snapshot
        |
        ...
    ```
    
    In this example, your new methods should go into the right concrete class whether it's Gzip, Snappy, etc. related or in the abstract `FileSnap` class if it's some common method.
    
    Of course, this needs to do a bit of a refactor which might not be trivial, so I wouldn't say we have to do this. I just wanted to give you an idea and get some feedback from you and the community. 


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    👍 Thanks for the feedback! 😄


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by yisong-yue <gi...@git.apache.org>.
Github user yisong-yue commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    Here are some benchmark results I did with my laptop:
    ```
    (1)
    			uncompressed		snappy			gzip
    Size:			~ 15.9 MB		~ 4.1 MB		~ 0.3 MB
    Serialize (run 1):	110 ms			36 ms			134 ms
    Serialize (run 2):	96 ms			33 ms			135 ms
    Deserialize (run 1):	40 ms			25 ms			76 ms
    Deserialize (run 2):	39 ms			26 ms			74 ms
    
    (2)
    			uncompressed		snappy			gzip
    Size:			~ 382 MB		~ 139 MB		~ 104 MB
    Serialize (run 1):	3393 ms			3279 ms			22096 ms
    Serialize (run 2):	3177 ms			3223 ms			24729 ms
    Deserialize (run 1):	4239 ms			3532 ms			14122 ms
    Deserialize (run 2):	5750 ms			4722 ms			16654 ms
    
    (3)
    			uncompressed		snappy			gzip
    Size:			~ 1.2 GB		~ 560 MB		~ 330 MB
    Serialize (run 1):	24250 ms		26575 ms		208570 ms
    Serialize (run 2):	23717 ms		25648 ms		197400 ms
    Deserialize (run 1):	51510 ms		67274 ms		189142 ms
    Deserialize (run 2):	50185 ms		64691 ms		181521 ms
    ```


---

[GitHub] zookeeper issue #690: ZOOKEEPER-3179: Add snapshot compression to reduce the...

Posted by eolivelli <gi...@git.apache.org>.
Github user eolivelli commented on the issue:

    https://github.com/apache/zookeeper/pull/690
  
    retest this please


---

[GitHub] zookeeper pull request #690: ZOOKEEPER-3179: Add snapshot compression to red...

Posted by nkalmar <gi...@git.apache.org>.
Github user nkalmar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/690#discussion_r232325666
  
    --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java ---
    @@ -0,0 +1,336 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.zookeeper.server.persistence;
    +
    +import java.io.BufferedInputStream;
    +import java.io.BufferedOutputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileNotFoundException;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.io.RandomAccessFile;
    +import java.nio.ByteBuffer;
    +import java.util.Arrays;
    +import java.util.zip.Adler32;
    +import java.util.zip.CheckedInputStream;
    +import java.util.zip.CheckedOutputStream;
    +import java.util.zip.GZIPInputStream;
    +import java.util.zip.GZIPOutputStream;
    +
    +import org.apache.jute.InputArchive;
    +import org.apache.jute.OutputArchive;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.xerial.snappy.SnappyCodec;
    +import org.xerial.snappy.SnappyInputStream;
    +import org.xerial.snappy.SnappyOutputStream;
    +
    +/**
    + * Represent the Stream used in serialize and deserialize the Snapshot.
    + */
    +public class SnapStream {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(SnapStream.class);
    +
    +    public static final String ZOOKEEPER_SHAPSHOT_STREAM_MODE =
    +        "zookeeper.snapshot.compression.method";
    +
    +    private static StreamMode streamMode =
    +        StreamMode.fromString(
    +            System.getProperty(ZOOKEEPER_SHAPSHOT_STREAM_MODE,
    +                  StreamMode.DEFAULT_MODE.getName()));
    +
    +    static {
    +        LOG.info(ZOOKEEPER_SHAPSHOT_STREAM_MODE + "=" + streamMode);
    +    }
    +
    +    public static enum StreamMode {
    +        GZIP("gz"),
    +        SNAPPY("snappy"),
    +        CHECKED("");
    +
    +        public static final StreamMode DEFAULT_MODE = CHECKED;
    +
    +        private String name;
    +
    +        StreamMode(String name) {
    +           this.name = name;
    +        }
    +
    +        public String getName() {
    +            return name;
    +        }
    +
    +        public String getFileExtension() {
    +            return name.isEmpty() ? "" : "." + name;
    +        }
    +
    +        public static StreamMode fromString(String name) {
    +            for (StreamMode c : values()) {
    +                if (c.getName().compareToIgnoreCase(name) == 0) {
    +                    return c;
    +                }
    +            }
    +            return DEFAULT_MODE;
    +        }
    +    }
    +
    +    /**
    +     * Return the CheckedInputStream based on the extension of the fileName.
    +     *
    +     * @param fileName the file the InputStream read from
    +     * @return the specific InputStream
    +     * @throws IOException
    +     */
    +    public static CheckedInputStream getInputStream(File file) throws IOException {
    +        FileInputStream fis = new FileInputStream(file);
    +        InputStream is;
    +        switch (getStreamMode(file.getName())) {
    +            case GZIP:
    +                is = new GZIPInputStream(fis);
    +                break;
    +            case SNAPPY:
    +                is = new SnappyInputStream(fis);
    +                break;
    +            case CHECKED:
    +            default:
    +                is = new BufferedInputStream(fis);
    +        }
    +        return new CheckedInputStream(is, new Adler32());
    +    }
    +
    +    /**
    +     * Return the OutputStream based on predefined stream mode.
    +     *
    +     * @param fileName the file the OutputStream writes to
    +     * @return the specific OutputStream
    +     * @throws IOException
    +     */
    +    public static CheckedOutputStream getOutputStream(File file) throws IOException {
    +        FileOutputStream fos = new FileOutputStream(file);
    +        OutputStream os;
    +        switch (streamMode) {
    +            case GZIP:
    +                os = new GZIPOutputStream(fos);
    +                break;
    +            case SNAPPY:
    +                os = new SnappyOutputStream(fos);
    +                break;
    +            case CHECKED:
    +            default:
    +                os = new BufferedOutputStream(fos);
    +        }
    +        return new CheckedOutputStream(os, new Adler32());
    +    }
    +
    +    /**
    +     * Write specific seal to the OutputArchive and close the OutputStream.
    +     * Currently, only CheckedOutputStream will write it's checkSum to the
    +     * end of the stream.
    +     *
    +     */
    +    public static void sealStream(CheckedOutputStream os, OutputArchive oa)
    +            throws IOException {
    +        long val = os.getChecksum().getValue();
    +        oa.writeLong(val, "val");
    +        oa.writeString("/", "path");
    +    }
    +
    +    /**
    +     * Verify the integrity of the seal, only CheckedInputStream will verify
    +     * the checkSum of the content.
    +     *
    +     */
    +    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia)
    +            throws IOException {
    +        long checkSum = is.getChecksum().getValue();
    +        long val = ia.readLong("val");
    +        if (val != checkSum) {
    +            throw new IOException("CRC corruption");
    +        }
    +    }
    +
    +    /**
    +     * Verifies that the file is a valid snapshot. Snapshot may be invalid if
    +     * it's incomplete as in a situation when the server dies while in the
    +     * process of storing a snapshot. Any files that are improperly formated
    +     * or corrupted are invalid. Any file that is not a snapshot is also an
    +     * invalid snapshot.
    +     *
    +     * @param file file to verify
    +     * @return true if the snapshot is valid
    +     * @throws IOException
    +     */
    +    public static boolean isValidSnapshot(File file) throws IOException {
    +        if (file == null || Util.getZxidFromName(file.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1) {
    +            return false;
    +        }
    +
    +        String fileName = file.getName();
    +        if (Util.getZxidFromName(fileName, "snapshot") == -1) {
    +            return false;
    +        }
    +
    +        boolean isValid = false;
    +        switch (getStreamMode(fileName)) {
    +            case GZIP:
    +                isValid = isValidGZipStream(file);
    +                break;
    +            case SNAPPY:
    +                isValid = isValidSnappyStream(file);
    +                break;
    +            case CHECKED:
    +            default:
    +                isValid = isValidCheckedStream(file);
    +        }
    +        return isValid;
    +    }
    +
    +    public static void setStreamMode(StreamMode mode) {
    +        streamMode = mode;
    +    }
    +
    +    public static StreamMode getStreamMode() {
    +        return streamMode;
    +    }
    +
    +    /**
    +     * Detect the stream mode from file name extension
    +     *
    +     * @param fileName
    +     * @return
    +     */
    +    public static StreamMode getStreamMode(String fileName) {
    +        String[] splitSnapName = fileName.split("\\.");
    +
    +        // Use file extension to detect format
    +        if (splitSnapName.length > 1) {
    +            String mode = splitSnapName[splitSnapName.length - 1];
    +            return StreamMode.fromString(mode);
    +        }
    +
    +        return StreamMode.CHECKED;
    +    }
    +
    +    /**
    +     * Certify the GZip stream integrity by checking the header
    +     * for the GZip magic string
    +     *
    +     * @param f file to verify
    +     * @return true if it has the correct GZip magic string
    +     * @throws IOException
    +     */
    +    private static boolean isValidGZipStream(File f) throws IOException {
    +        FileInputStream fis = null;
    +        byte[] byteArray = new byte[2];
    +
    +        try {
    +            fis = new FileInputStream(f);
    +            fis.read(byteArray, 0, 2);
    --- End diff --
    
    Findbugs reports this, as you ignore the result of read().
    Please add it to findbugsExcludeFile.xml to ignore it (or you can use return value to check if the read was successful here, not necessary though).


---