You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by omalley <gi...@git.apache.org> on 2018/06/03 19:57:37 UTC

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

GitHub user omalley opened a pull request:

    https://github.com/apache/orc/pull/278

    ORC-251: Extend InStream and OutStream to support encryption.

    This patch:
    * Adds a method to Codec to get the CompressionKind.
    * Creates StreamOptions for both InStream and OutStream to gather together the parameters they need.
    * Extends InStream and OutStream to handle encryption.
    * Changes InStream to use DiskRangeList instead of List<DiskRange>.
    * Creates CryptoUtils with a method to create an IV based on the stream name.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/omalley/orc orc-251

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/orc/pull/278.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #278
    
----
commit ff620c3faccbabc7e011e245dfb3bcdbfef41b7a
Author: Owen O'Malley <om...@...>
Date:   2018-05-09T16:36:28Z

    ORC-251: Extend InStream and OutStream to support encryption.

----


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202473870
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -401,84 +596,149 @@ private String rangeString() {
         @Override
         public String toString() {
           return "compressed stream " + name + " position: " + currentOffset +
    -          " length: " + length + " range: " + currentRange +
    -          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
    +          " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
    +          " offset: " + (compressed == null ? 0 : compressed.position()) +
    +          " limit: " + (compressed == null ? 0 : compressed.limit()) +
               rangeString() +
               (uncompressed == null ? "" :
                   " uncompressed: " + uncompressed.position() + " to " +
                       uncompressed.limit());
         }
       }
     
    +  private static class EncryptedCompressedStream extends CompressedStream {
    +    private final EncryptionState encrypt;
    +
    +    public EncryptedCompressedStream(String name,
    +                                     DiskRangeList input,
    +                                     long length,
    +                                     StreamOptions options) {
    +      super(name, length, options);
    +      encrypt = new EncryptionState(name, options);
    +      reset(input, length);
    +    }
    +
    +    @Override
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        if (isJump) {
    +          encrypt.changeIv(newRange.getOffset());
    +        }
    +        compressed = encrypt.decrypt(newRange);
    +        compressed.position((int) (currentOffset - newRange.getOffset()));
    +      }
    +    }
    +
    +    @Override
    +    public void close() {
    +      super.close();
    +      encrypt.close();
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return "encrypted " + super.toString();
    +    }
    +  }
    +
       public abstract void seek(PositionProvider index) throws IOException;
     
    +  public static class StreamOptions implements Cloneable {
    +    private CompressionCodec codec;
    +    private int bufferSize;
    +    private EncryptionAlgorithm algorithm;
    +    private Key key;
    +    private byte[] iv;
    +
    +    public StreamOptions withCodec(CompressionCodec value) {
    +      this.codec = value;
    +      return this;
    +    }
    +
    +    public StreamOptions withBufferSize(int value) {
    +      bufferSize = value;
    +      return this;
    +    }
    +
    +    public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
    +                                        Key key,
    +                                        byte[] iv) {
    +      this.algorithm = algorithm;
    +      this.key = key;
    +      this.iv = iv;
    +      return this;
    +    }
    +
    +    public CompressionCodec getCodec() {
    +      return codec;
    +    }
    +
    +    @Override
    +    public StreamOptions clone() {
    +      try {
    +        StreamOptions clone = (StreamOptions) super.clone();
    +        if (clone.codec != null) {
    +          // Make sure we don't share the same codec between two readers.
    +          clone.codec = OrcCodecPool.getCodec(codec.getKind());
    +        }
    +        return clone;
    --- End diff --
    
    *sigh* It is Cloneable, because it is used as a field in DefaultDataReader. The clone is correct, I believe. We really need a better solution for CompressionCodec than the current Cloneable and CompressionCodecPool mess. We need to remove the compression parameters out of Codec and then they are completely stateless again and can be trivially pooled.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202188844
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -173,33 +203,208 @@ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
         }
       }
     
    +  /**
    +   * Manage the state of the decryption, including the ability to seek.
    +   */
    +  static class EncryptionState {
    +    private final String name;
    +    private final EncryptionAlgorithm algorithm;
    +    private final Key key;
    +    private final byte[] iv;
    +    private final Cipher cipher;
    +    private ByteBuffer decrypted;
    +
    +    EncryptionState(String name, StreamOptions options) {
    +      this.name = name;
    +      algorithm = options.algorithm;
    +      key = options.key;
    +      iv = options.iv;
    +      cipher = algorithm.createCipher();
    +    }
    +
    +    /**
    +     * We are seeking to a new range, so update the cipher to change the IV
    +     * to match. This code assumes that we only support encryption in CTR mode.
    --- End diff --
    
    Comment about how the bottom 8 bytes of IV will be filled (block counter) will be helpful.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/orc/pull/278


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202146043
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -55,112 +60,137 @@ public long getStreamLength() {
       @Override
       public abstract void close();
     
    +  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
    +    int result = 0;
    +    DiskRangeList range = list;
    +    while (range != null && range != current) {
    +      result += 1;
    +      range = range.next;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Implements a stream over an uncompressed stream.
    +   */
       public static class UncompressedStream extends InStream {
    -    private List<DiskRange> bytes;
    +    private DiskRangeList bytes;
         private long length;
         protected long currentOffset;
    -    private ByteBuffer range;
    -    private int currentRange;
    +    protected ByteBuffer decrypted;
    +    protected DiskRangeList currentRange;
    +
    +    /**
    +     * Create the stream without calling reset on it.
    +     * This is used for the subclass that needs to do more setup.
    +     * @param name name of the stream
    +     * @param length the number of bytes for the stream
    +     */
    +    public UncompressedStream(String name, long length) {
    +      super(name, length);
    +    }
     
    -    public UncompressedStream(String name, List<DiskRange> input, long length) {
    +    public UncompressedStream(String name,
    +                              DiskRangeList input,
    +                              long length) {
           super(name, length);
           reset(input, length);
         }
     
    -    protected void reset(List<DiskRange> input, long length) {
    +    protected void reset(DiskRangeList input, long length) {
           this.bytes = input;
           this.length = length;
    -      currentRange = 0;
    -      currentOffset = 0;
    -      range = null;
    +      currentOffset = input == null ? 0 : input.getOffset();
    +      setCurrent(input, true);
         }
     
         @Override
         public int read() {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
           currentOffset += 1;
    -      return 0xff & range.get();
    +      return 0xff & decrypted.get();
    +    }
    +
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        decrypted = newRange.getData().slice();
    +        decrypted.position((int) (currentOffset - newRange.getOffset()));
    +      }
         }
     
         @Override
         public int read(byte[] data, int offset, int length) {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == this.length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
    -      int actualLength = Math.min(length, range.remaining());
    -      range.get(data, offset, actualLength);
    +      int actualLength = Math.min(length, decrypted.remaining());
    +      decrypted.get(data, offset, actualLength);
           currentOffset += actualLength;
           return actualLength;
         }
     
         @Override
         public int available() {
    -      if (range != null && range.remaining() > 0) {
    -        return range.remaining();
    +      if (decrypted != null && decrypted.remaining() > 0) {
    +        return decrypted.remaining();
           }
           return (int) (length - currentOffset);
         }
     
         @Override
         public void close() {
    -      currentRange = bytes.size();
    +      currentRange = null;
           currentOffset = length;
           // explicit de-ref of bytes[]
    -      bytes.clear();
    +      decrypted = null;
    +      bytes = null;
         }
     
         @Override
         public void seek(PositionProvider index) throws IOException {
           seek(index.getNext());
         }
     
    -    public void seek(long desired) {
    -      if (desired == 0 && bytes.isEmpty()) {
    +    public void seek(long desired) throws IOException {
    +      if (desired == 0 && bytes == null) {
             return;
           }
    -      int i = 0;
    -      for (DiskRange curRange : bytes) {
    -        if (curRange.getOffset() <= desired &&
    -            (desired - curRange.getOffset()) < curRange.getLength()) {
    -          currentOffset = desired;
    -          currentRange = i;
    -          this.range = curRange.getData().duplicate();
    -          int pos = range.position();
    -          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
    -          this.range.position(pos);
    -          return;
    -        }
    -        ++i;
    -      }
    -      // if they are seeking to the precise end, go ahead and let them go there
    -      int segments = bytes.size();
    -      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
    +      // If we are seeking inside of the current range, just reposition.
    +      if (currentRange != null && desired >= currentRange.getOffset() &&
    +          desired < currentRange.getEnd()) {
    --- End diff --
    
    <= currentRange.getEnd()?


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202430812
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -55,112 +60,137 @@ public long getStreamLength() {
       @Override
       public abstract void close();
     
    +  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
    +    int result = 0;
    +    DiskRangeList range = list;
    +    while (range != null && range != current) {
    +      result += 1;
    +      range = range.next;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Implements a stream over an uncompressed stream.
    +   */
       public static class UncompressedStream extends InStream {
    -    private List<DiskRange> bytes;
    +    private DiskRangeList bytes;
         private long length;
         protected long currentOffset;
    -    private ByteBuffer range;
    -    private int currentRange;
    +    protected ByteBuffer decrypted;
    +    protected DiskRangeList currentRange;
    +
    +    /**
    +     * Create the stream without calling reset on it.
    +     * This is used for the subclass that needs to do more setup.
    +     * @param name name of the stream
    +     * @param length the number of bytes for the stream
    +     */
    +    public UncompressedStream(String name, long length) {
    +      super(name, length);
    +    }
     
    -    public UncompressedStream(String name, List<DiskRange> input, long length) {
    +    public UncompressedStream(String name,
    +                              DiskRangeList input,
    +                              long length) {
           super(name, length);
           reset(input, length);
         }
     
    -    protected void reset(List<DiskRange> input, long length) {
    +    protected void reset(DiskRangeList input, long length) {
           this.bytes = input;
           this.length = length;
    -      currentRange = 0;
    -      currentOffset = 0;
    -      range = null;
    +      currentOffset = input == null ? 0 : input.getOffset();
    +      setCurrent(input, true);
         }
     
         @Override
         public int read() {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
           currentOffset += 1;
    -      return 0xff & range.get();
    +      return 0xff & decrypted.get();
    +    }
    +
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        decrypted = newRange.getData().slice();
    +        decrypted.position((int) (currentOffset - newRange.getOffset()));
    --- End diff --
    
    No, it is moving the ByteBuffer position to the correct spot so that it matches currentOffset, which is relative to the stream.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202431172
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -55,112 +60,137 @@ public long getStreamLength() {
       @Override
       public abstract void close();
     
    +  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
    +    int result = 0;
    +    DiskRangeList range = list;
    +    while (range != null && range != current) {
    +      result += 1;
    +      range = range.next;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Implements a stream over an uncompressed stream.
    +   */
       public static class UncompressedStream extends InStream {
    -    private List<DiskRange> bytes;
    +    private DiskRangeList bytes;
         private long length;
         protected long currentOffset;
    -    private ByteBuffer range;
    -    private int currentRange;
    +    protected ByteBuffer decrypted;
    +    protected DiskRangeList currentRange;
    +
    +    /**
    +     * Create the stream without calling reset on it.
    +     * This is used for the subclass that needs to do more setup.
    +     * @param name name of the stream
    +     * @param length the number of bytes for the stream
    +     */
    +    public UncompressedStream(String name, long length) {
    +      super(name, length);
    +    }
     
    -    public UncompressedStream(String name, List<DiskRange> input, long length) {
    +    public UncompressedStream(String name,
    +                              DiskRangeList input,
    +                              long length) {
           super(name, length);
           reset(input, length);
         }
     
    -    protected void reset(List<DiskRange> input, long length) {
    +    protected void reset(DiskRangeList input, long length) {
           this.bytes = input;
           this.length = length;
    -      currentRange = 0;
    -      currentOffset = 0;
    -      range = null;
    +      currentOffset = input == null ? 0 : input.getOffset();
    +      setCurrent(input, true);
         }
     
         @Override
         public int read() {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
           currentOffset += 1;
    -      return 0xff & range.get();
    +      return 0xff & decrypted.get();
    +    }
    +
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        decrypted = newRange.getData().slice();
    +        decrypted.position((int) (currentOffset - newRange.getOffset()));
    --- End diff --
    
    But I'll add a comment to the code.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202470866
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -173,33 +203,208 @@ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
         }
       }
     
    +  /**
    +   * Manage the state of the decryption, including the ability to seek.
    +   */
    +  static class EncryptionState {
    +    private final String name;
    +    private final EncryptionAlgorithm algorithm;
    +    private final Key key;
    +    private final byte[] iv;
    +    private final Cipher cipher;
    +    private ByteBuffer decrypted;
    +
    +    EncryptionState(String name, StreamOptions options) {
    +      this.name = name;
    +      algorithm = options.algorithm;
    +      key = options.key;
    +      iv = options.iv;
    +      cipher = algorithm.createCipher();
    +    }
    +
    +    /**
    +     * We are seeking to a new range, so update the cipher to change the IV
    +     * to match. This code assumes that we only support encryption in CTR mode.
    +     * @param offset where we are seeking to in the stream
    +     */
    +    void changeIv(long offset) {
    +      int blockSize = cipher.getBlockSize();
    +      long encryptionBlocks = offset / blockSize;
    --- End diff --
    
    No, we always start decryption on a block boundary and throw away the leading bytes. See InStream.EncryptionState.changeIv(long offset) and the "extra" and "wasted" variables.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202189630
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -401,84 +596,149 @@ private String rangeString() {
         @Override
         public String toString() {
           return "compressed stream " + name + " position: " + currentOffset +
    -          " length: " + length + " range: " + currentRange +
    -          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
    +          " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
    +          " offset: " + (compressed == null ? 0 : compressed.position()) +
    +          " limit: " + (compressed == null ? 0 : compressed.limit()) +
               rangeString() +
               (uncompressed == null ? "" :
                   " uncompressed: " + uncompressed.position() + " to " +
                       uncompressed.limit());
         }
       }
     
    +  private static class EncryptedCompressedStream extends CompressedStream {
    +    private final EncryptionState encrypt;
    +
    +    public EncryptedCompressedStream(String name,
    +                                     DiskRangeList input,
    +                                     long length,
    +                                     StreamOptions options) {
    +      super(name, length, options);
    +      encrypt = new EncryptionState(name, options);
    +      reset(input, length);
    +    }
    +
    +    @Override
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        if (isJump) {
    +          encrypt.changeIv(newRange.getOffset());
    +        }
    +        compressed = encrypt.decrypt(newRange);
    +        compressed.position((int) (currentOffset - newRange.getOffset()));
    +      }
    +    }
    +
    +    @Override
    +    public void close() {
    +      super.close();
    +      encrypt.close();
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return "encrypted " + super.toString();
    +    }
    +  }
    +
       public abstract void seek(PositionProvider index) throws IOException;
     
    +  public static class StreamOptions implements Cloneable {
    +    private CompressionCodec codec;
    +    private int bufferSize;
    +    private EncryptionAlgorithm algorithm;
    +    private Key key;
    +    private byte[] iv;
    +
    +    public StreamOptions withCodec(CompressionCodec value) {
    +      this.codec = value;
    +      return this;
    +    }
    +
    +    public StreamOptions withBufferSize(int value) {
    +      bufferSize = value;
    +      return this;
    +    }
    +
    +    public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
    +                                        Key key,
    +                                        byte[] iv) {
    +      this.algorithm = algorithm;
    +      this.key = key;
    +      this.iv = iv;
    +      return this;
    +    }
    +
    +    public CompressionCodec getCodec() {
    +      return codec;
    +    }
    +
    +    @Override
    +    public StreamOptions clone() {
    +      try {
    +        StreamOptions clone = (StreamOptions) super.clone();
    +        if (clone.codec != null) {
    +          // Make sure we don't share the same codec between two readers.
    +          clone.codec = OrcCodecPool.getCodec(codec.getKind());
    +        }
    +        return clone;
    --- End diff --
    
    why is this clone required? incomplete clone anyways


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202433617
  
    --- Diff: java/core/src/java/org/apache/orc/impl/CryptoUtils.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.orc.impl;
    +
    +import org.apache.orc.EncryptionAlgorithm;
    +import java.security.SecureRandom;
    +
    +/**
    + * This class has routines to work with encryption within ORC files.
    + */
    +public class CryptoUtils {
    +
    +  private static final int COLUMN_ID_LENGTH = 3;
    +  private static final int KIND_LENGTH = 2;
    +  private static final int STRIPE_ID_LENGTH = 3;
    +  private static final int MIN_COUNT_BYTES = 8;
    +
    +  private static final SecureRandom random = new SecureRandom();
    --- End diff --
    
    oops. Removed.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202181381
  
    --- Diff: java/core/src/java/org/apache/orc/impl/CryptoUtils.java ---
    @@ -0,0 +1,77 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.orc.impl;
    +
    +import org.apache.orc.EncryptionAlgorithm;
    +import java.security.SecureRandom;
    +
    +/**
    + * This class has routines to work with encryption within ORC files.
    + */
    +public class CryptoUtils {
    +
    +  private static final int COLUMN_ID_LENGTH = 3;
    +  private static final int KIND_LENGTH = 2;
    +  private static final int STRIPE_ID_LENGTH = 3;
    +  private static final int MIN_COUNT_BYTES = 8;
    +
    +  private static final SecureRandom random = new SecureRandom();
    --- End diff --
    
    unused?


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202190526
  
    --- Diff: java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.orc.impl.writer;
    +
    +import org.apache.orc.CompressionCodec;
    +import org.apache.orc.EncryptionAlgorithm;
    +
    +import java.security.Key;
    +
    +/**
    + * The compression and encryption options for writing a stream.
    + */
    +public class StreamOptions {
    --- End diff --
    
    Is this duplicate class? I saw StreamOptions class above as well. 


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202187998
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -173,33 +203,208 @@ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
         }
       }
     
    +  /**
    +   * Manage the state of the decryption, including the ability to seek.
    +   */
    +  static class EncryptionState {
    +    private final String name;
    +    private final EncryptionAlgorithm algorithm;
    +    private final Key key;
    +    private final byte[] iv;
    +    private final Cipher cipher;
    +    private ByteBuffer decrypted;
    +
    +    EncryptionState(String name, StreamOptions options) {
    +      this.name = name;
    +      algorithm = options.algorithm;
    +      key = options.key;
    +      iv = options.iv;
    +      cipher = algorithm.createCipher();
    +    }
    +
    +    /**
    +     * We are seeking to a new range, so update the cipher to change the IV
    +     * to match. This code assumes that we only support encryption in CTR mode.
    +     * @param offset where we are seeking to in the stream
    +     */
    +    void changeIv(long offset) {
    +      int blockSize = cipher.getBlockSize();
    +      long encryptionBlocks = offset / blockSize;
    --- End diff --
    
    Add +1 if there is extra?


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202471794
  
    --- Diff: java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java ---
    @@ -0,0 +1,85 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.orc.impl.writer;
    +
    +import org.apache.orc.CompressionCodec;
    +import org.apache.orc.EncryptionAlgorithm;
    +
    +import java.security.Key;
    +
    +/**
    + * The compression and encryption options for writing a stream.
    + */
    +public class StreamOptions {
    --- End diff --
    
    It is org.apache.orc.impl.InStream.StreamOptions and org.apache.orc.impl.writer.StreamOptions. I guess they should be named the same way, but they are separate classes.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202144150
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -55,112 +60,137 @@ public long getStreamLength() {
       @Override
       public abstract void close();
     
    +  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
    +    int result = 0;
    +    DiskRangeList range = list;
    +    while (range != null && range != current) {
    +      result += 1;
    +      range = range.next;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Implements a stream over an uncompressed stream.
    +   */
       public static class UncompressedStream extends InStream {
    -    private List<DiskRange> bytes;
    +    private DiskRangeList bytes;
         private long length;
         protected long currentOffset;
    -    private ByteBuffer range;
    -    private int currentRange;
    +    protected ByteBuffer decrypted;
    +    protected DiskRangeList currentRange;
    +
    +    /**
    +     * Create the stream without calling reset on it.
    +     * This is used for the subclass that needs to do more setup.
    +     * @param name name of the stream
    +     * @param length the number of bytes for the stream
    +     */
    +    public UncompressedStream(String name, long length) {
    +      super(name, length);
    +    }
     
    -    public UncompressedStream(String name, List<DiskRange> input, long length) {
    +    public UncompressedStream(String name,
    +                              DiskRangeList input,
    +                              long length) {
           super(name, length);
           reset(input, length);
         }
     
    -    protected void reset(List<DiskRange> input, long length) {
    +    protected void reset(DiskRangeList input, long length) {
           this.bytes = input;
           this.length = length;
    -      currentRange = 0;
    -      currentOffset = 0;
    -      range = null;
    +      currentOffset = input == null ? 0 : input.getOffset();
    +      setCurrent(input, true);
         }
     
         @Override
         public int read() {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
           currentOffset += 1;
    -      return 0xff & range.get();
    +      return 0xff & decrypted.get();
    +    }
    +
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        decrypted = newRange.getData().slice();
    +        decrypted.position((int) (currentOffset - newRange.getOffset()));
    --- End diff --
    
    Could you leave a comment as in why positioning is done this way? Looks like this is when 2 diskranges overlap. 


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202147519
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -55,112 +60,137 @@ public long getStreamLength() {
       @Override
       public abstract void close();
     
    +  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
    +    int result = 0;
    +    DiskRangeList range = list;
    +    while (range != null && range != current) {
    +      result += 1;
    +      range = range.next;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Implements a stream over an uncompressed stream.
    +   */
       public static class UncompressedStream extends InStream {
    -    private List<DiskRange> bytes;
    +    private DiskRangeList bytes;
         private long length;
         protected long currentOffset;
    -    private ByteBuffer range;
    -    private int currentRange;
    +    protected ByteBuffer decrypted;
    +    protected DiskRangeList currentRange;
    +
    +    /**
    +     * Create the stream without calling reset on it.
    +     * This is used for the subclass that needs to do more setup.
    +     * @param name name of the stream
    +     * @param length the number of bytes for the stream
    +     */
    +    public UncompressedStream(String name, long length) {
    +      super(name, length);
    +    }
     
    -    public UncompressedStream(String name, List<DiskRange> input, long length) {
    +    public UncompressedStream(String name,
    +                              DiskRangeList input,
    +                              long length) {
           super(name, length);
           reset(input, length);
         }
     
    -    protected void reset(List<DiskRange> input, long length) {
    +    protected void reset(DiskRangeList input, long length) {
           this.bytes = input;
           this.length = length;
    -      currentRange = 0;
    -      currentOffset = 0;
    -      range = null;
    +      currentOffset = input == null ? 0 : input.getOffset();
    +      setCurrent(input, true);
         }
     
         @Override
         public int read() {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
           currentOffset += 1;
    -      return 0xff & range.get();
    +      return 0xff & decrypted.get();
    +    }
    +
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        decrypted = newRange.getData().slice();
    +        decrypted.position((int) (currentOffset - newRange.getOffset()));
    +      }
         }
     
         @Override
         public int read(byte[] data, int offset, int length) {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == this.length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
    -      int actualLength = Math.min(length, range.remaining());
    -      range.get(data, offset, actualLength);
    +      int actualLength = Math.min(length, decrypted.remaining());
    +      decrypted.get(data, offset, actualLength);
           currentOffset += actualLength;
           return actualLength;
         }
     
         @Override
         public int available() {
    -      if (range != null && range.remaining() > 0) {
    -        return range.remaining();
    +      if (decrypted != null && decrypted.remaining() > 0) {
    +        return decrypted.remaining();
           }
           return (int) (length - currentOffset);
         }
     
         @Override
         public void close() {
    -      currentRange = bytes.size();
    +      currentRange = null;
           currentOffset = length;
           // explicit de-ref of bytes[]
    -      bytes.clear();
    +      decrypted = null;
    +      bytes = null;
         }
     
         @Override
         public void seek(PositionProvider index) throws IOException {
           seek(index.getNext());
         }
     
    -    public void seek(long desired) {
    -      if (desired == 0 && bytes.isEmpty()) {
    +    public void seek(long desired) throws IOException {
    +      if (desired == 0 && bytes == null) {
             return;
           }
    -      int i = 0;
    -      for (DiskRange curRange : bytes) {
    -        if (curRange.getOffset() <= desired &&
    -            (desired - curRange.getOffset()) < curRange.getLength()) {
    -          currentOffset = desired;
    -          currentRange = i;
    -          this.range = curRange.getData().duplicate();
    -          int pos = range.position();
    -          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
    -          this.range.position(pos);
    -          return;
    -        }
    -        ++i;
    -      }
    -      // if they are seeking to the precise end, go ahead and let them go there
    -      int segments = bytes.size();
    -      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
    +      // If we are seeking inside of the current range, just reposition.
    +      if (currentRange != null && desired >= currentRange.getOffset() &&
    +          desired < currentRange.getEnd()) {
    +        decrypted.position((int) (desired - currentRange.getOffset()));
             currentOffset = desired;
    -        currentRange = segments - 1;
    -        DiskRange curRange = bytes.get(currentRange);
    -        this.range = curRange.getData().duplicate();
    -        int pos = range.position();
    -        pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
    -        this.range.position(pos);
    -        return;
    +      } else {
    +        for (DiskRangeList curRange = bytes; curRange != null;
    +             curRange = curRange.next) {
    +          if (curRange.getOffset() <= desired &&
    --- End diff --
    
    can this be simplified to (desired >= curRange.getOffset() && desired <= curRange.getEnd()) ? unless I am missing something here. 


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202472637
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -173,33 +203,208 @@ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
         }
       }
     
    +  /**
    +   * Manage the state of the decryption, including the ability to seek.
    +   */
    +  static class EncryptionState {
    +    private final String name;
    +    private final EncryptionAlgorithm algorithm;
    +    private final Key key;
    +    private final byte[] iv;
    +    private final Cipher cipher;
    +    private ByteBuffer decrypted;
    +
    +    EncryptionState(String name, StreamOptions options) {
    +      this.name = name;
    +      algorithm = options.algorithm;
    +      key = options.key;
    +      iv = options.iv;
    +      cipher = algorithm.createCipher();
    +    }
    +
    +    /**
    +     * We are seeking to a new range, so update the cipher to change the IV
    +     * to match. This code assumes that we only support encryption in CTR mode.
    --- End diff --
    
    Ok, I added a comment in CryptoUtils.createIvForStream. In InStream, the IV could be anything they provide although ORC will always have 0 in the bottom 8 bytes.


---

[GitHub] orc pull request #278: ORC-251: Extend InStream and OutStream to support enc...

Posted by omalley <gi...@git.apache.org>.
Github user omalley commented on a diff in the pull request:

    https://github.com/apache/orc/pull/278#discussion_r202433445
  
    --- Diff: java/core/src/java/org/apache/orc/impl/InStream.java ---
    @@ -55,112 +60,137 @@ public long getStreamLength() {
       @Override
       public abstract void close();
     
    +  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
    +    int result = 0;
    +    DiskRangeList range = list;
    +    while (range != null && range != current) {
    +      result += 1;
    +      range = range.next;
    +    }
    +    return result;
    +  }
    +
    +  /**
    +   * Implements a stream over an uncompressed stream.
    +   */
       public static class UncompressedStream extends InStream {
    -    private List<DiskRange> bytes;
    +    private DiskRangeList bytes;
         private long length;
         protected long currentOffset;
    -    private ByteBuffer range;
    -    private int currentRange;
    +    protected ByteBuffer decrypted;
    +    protected DiskRangeList currentRange;
    +
    +    /**
    +     * Create the stream without calling reset on it.
    +     * This is used for the subclass that needs to do more setup.
    +     * @param name name of the stream
    +     * @param length the number of bytes for the stream
    +     */
    +    public UncompressedStream(String name, long length) {
    +      super(name, length);
    +    }
     
    -    public UncompressedStream(String name, List<DiskRange> input, long length) {
    +    public UncompressedStream(String name,
    +                              DiskRangeList input,
    +                              long length) {
           super(name, length);
           reset(input, length);
         }
     
    -    protected void reset(List<DiskRange> input, long length) {
    +    protected void reset(DiskRangeList input, long length) {
           this.bytes = input;
           this.length = length;
    -      currentRange = 0;
    -      currentOffset = 0;
    -      range = null;
    +      currentOffset = input == null ? 0 : input.getOffset();
    +      setCurrent(input, true);
         }
     
         @Override
         public int read() {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
           currentOffset += 1;
    -      return 0xff & range.get();
    +      return 0xff & decrypted.get();
    +    }
    +
    +    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
    +      currentRange = newRange;
    +      if (newRange != null) {
    +        decrypted = newRange.getData().slice();
    +        decrypted.position((int) (currentOffset - newRange.getOffset()));
    +      }
         }
     
         @Override
         public int read(byte[] data, int offset, int length) {
    -      if (range == null || range.remaining() == 0) {
    +      if (decrypted == null || decrypted.remaining() == 0) {
             if (currentOffset == this.length) {
               return -1;
             }
    -        seek(currentOffset);
    +        setCurrent(currentRange.next, false);
           }
    -      int actualLength = Math.min(length, range.remaining());
    -      range.get(data, offset, actualLength);
    +      int actualLength = Math.min(length, decrypted.remaining());
    +      decrypted.get(data, offset, actualLength);
           currentOffset += actualLength;
           return actualLength;
         }
     
         @Override
         public int available() {
    -      if (range != null && range.remaining() > 0) {
    -        return range.remaining();
    +      if (decrypted != null && decrypted.remaining() > 0) {
    +        return decrypted.remaining();
           }
           return (int) (length - currentOffset);
         }
     
         @Override
         public void close() {
    -      currentRange = bytes.size();
    +      currentRange = null;
           currentOffset = length;
           // explicit de-ref of bytes[]
    -      bytes.clear();
    +      decrypted = null;
    +      bytes = null;
         }
     
         @Override
         public void seek(PositionProvider index) throws IOException {
           seek(index.getNext());
         }
     
    -    public void seek(long desired) {
    -      if (desired == 0 && bytes.isEmpty()) {
    +    public void seek(long desired) throws IOException {
    +      if (desired == 0 && bytes == null) {
             return;
           }
    -      int i = 0;
    -      for (DiskRange curRange : bytes) {
    -        if (curRange.getOffset() <= desired &&
    -            (desired - curRange.getOffset()) < curRange.getLength()) {
    -          currentOffset = desired;
    -          currentRange = i;
    -          this.range = curRange.getData().duplicate();
    -          int pos = range.position();
    -          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
    -          this.range.position(pos);
    -          return;
    -        }
    -        ++i;
    -      }
    -      // if they are seeking to the precise end, go ahead and let them go there
    -      int segments = bytes.size();
    -      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
    +      // If we are seeking inside of the current range, just reposition.
    +      if (currentRange != null && desired >= currentRange.getOffset() &&
    +          desired < currentRange.getEnd()) {
    --- End diff --
    
    No, the currentRange.getEnd() is the position after the range (offset + length), so to see if an offset is in the currentRange, I need to use <.


---