You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by RCheungIT <gi...@git.apache.org> on 2016/05/31 15:50:31 UTC

[GitHub] phoenix pull request: Phoenix-2405

GitHub user RCheungIT opened a pull request:

    https://github.com/apache/phoenix/pull/171

    Phoenix-2405

    https://issues.apache.org/jira/browse/PHOENIX-2405

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/phoenix/pull/171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #171
    
----
commit 86f134afb1d3e08c49fb49a8d655ab8de04be0d8
Author: Haoran Zhang <hu...@outlook.com>
Date:   2016-04-22T22:41:23Z

    Merge pull request #1 from apache/master
    
    Merge from master

commit 4598a1f082cffe709c6124361fba21760fab4d65
Author: RCheungIT <hu...@outlook.com>
Date:   2016-05-27T17:12:41Z

    add deferredByteBufferQueue

commit 16571f763858f348ea0832a23cc12cdf7cd41273
Author: RCheungIT <hu...@outlook.com>
Date:   2016-05-29T15:43:35Z

    extract interface for buffersegment and bufferqueue

commit de308b99c4485c56bc543eda7362c2d2d93d07c6
Author: RCheungIT <hu...@outlook.com>
Date:   2016-05-31T15:49:14Z

    add memory management

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65294053
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferSegmentQueue.java ---
    @@ -0,0 +1,251 @@
    +package org.apache.phoenix.iterate;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +
    +import java.io.*;
    +import java.util.*;
    +
    +public abstract class BufferSegmentQueue<T> extends AbstractQueue<T> {
    +    protected static final int EOF = -1;
    +
    +    protected final int index;
    +    protected final int thresholdBytes;
    +    protected final boolean hasMaxQueueSize;
    +    protected long totalResultSize = 0;
    +    protected int maxResultSize = 0;
    +
    +    protected File file;
    +    private boolean isClosed = false;
    +    protected boolean flushBuffer = false;
    +    protected int flushedCount = 0;
    +
    +    private T current = null;
    +
    +    protected SegmentQueueFileIterator thisIterator;
    +    // iterators to close on close()
    +    protected List<SegmentQueueFileIterator> iterators;
    +
    +    public BufferSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
    +        this.index = index;
    +        this.thresholdBytes = thresholdBytes;
    +        this.hasMaxQueueSize = hasMaxQueueSize;
    +        this.iterators = Lists.<SegmentQueueFileIterator> newArrayList();
    +    }
    +
    +    abstract protected Queue<T> getInMemoryQueue();
    +    abstract protected int sizeOf(T e);
    +
    +
    +    public int index() {
    +        return this.index;
    +    }
    +
    +    public int size() {
    +        if (flushBuffer)
    +            return flushedCount;
    +        return getInMemoryQueue().size();
    +    }
    +
    +    public long getInMemByteSize() {
    +        if (flushBuffer)
    +            return 0;
    +        return totalResultSize;
    +    }
    +
    +    public boolean isFlushed() {
    +        return flushBuffer;
    +    }
    +
    +    @Override
    +    public boolean offer(T e) {
    +        if (isClosed || flushBuffer)
    +            return false;
    +
    +        boolean added = getInMemoryQueue().add(e);
    +        if (added) {
    +            try {
    +                flush(e);
    +            } catch (IOException ex) {
    +                throw new RuntimeException(ex);
    +            }
    +        }
    +
    +        return added;
    +    }
    +
    +    @Override
    +    public T peek() {
    +        if (current == null && !isClosed) {
    +            current = next();
    +        }
    +
    +        return current;
    +    }
    +
    +    @Override
    +    public T poll() {
    +        T ret = peek();
    +        if (!isClosed) {
    +            current = next();
    +        } else {
    +            current = null;
    +        }
    +
    +        return ret;
    +    }
    +
    +    @Override
    +    public Iterator<T> iterator() {
    +        if (isClosed)
    +            return null;
    +
    +        if (!flushBuffer)
    +            return getInMemoryQueue().iterator();
    +
    +        SegmentQueueFileIterator iterator = createSegmentQueueFileIterator(thisIterator);
    --- End diff --
    
    Just realized this existing implementation could be very inefficient, by creating a new InputStream and skip a few bytes every time this is called. Is it possible to always return the same iterator with the current read state? Guess it will just work after using the SpoolingResultIterator logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65294376
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/DeferredByteBufferSegmentQueue.java ---
    @@ -0,0 +1,123 @@
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +
    +import java.io.*;
    +import java.util.*;
    +
    +public abstract class DeferredByteBufferSegmentQueue<T> extends BufferSegmentQueue<T> {
    +
    +    final MemoryChunk chunk;
    +
    +    public DeferredByteBufferSegmentQueue(int index, int thresholdBytes,
    +                                          boolean hasMaxQueueSize, MemoryManager memoryManager) {
    +        super(index, thresholdBytes, hasMaxQueueSize);
    +        chunk = memoryManager.allocate(thresholdBytes);
    +    }
    +
    +    abstract protected void writeToBuffer(OutputStream outputStream, T e);
    +    abstract protected T readFromBuffer(DataInput dataInput);
    +
    +
    +    @Override
    +    protected SegmentQueueFileIterator createSegmentQueueFileIterator(SegmentQueueFileIterator iterator){
    +        return new DeferredSegmentQueueFileIterator(iterator);
    +    }
    +
    +    @Override
    +    protected SegmentQueueFileIterator createSegmentQueueFileIterator(){
    +        return new DeferredSegmentQueueFileIterator();
    +    }
    +
    +    @Override
    +    protected void flush(T entry) throws IOException {
    +        Queue<T> inMemQueue = getInMemoryQueue();
    +        int resultSize = sizeOf(entry);
    +        maxResultSize = Math.max(maxResultSize, resultSize);
    +        totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
    +        if (totalResultSize >= thresholdBytes) {
    +            this.file = File.createTempFile(UUID.randomUUID().toString(), null);
    +
    +            DeferredFileOutputStream spoolTo = new DeferredFileOutputStream(thresholdBytes, file) {
    +                @Override
    +                protected void thresholdReached() throws IOException {
    +                    try {
    +                        super.thresholdReached();
    +                    } finally {
    +                            chunk.close();
    --- End diff --
    
    There's an indent problem here. You might not need these lines after the suggested change, but would you mind double checking other places for code indent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix issue #171: Phoenix-2405

Posted by RCheungIT <gi...@git.apache.org>.
Github user RCheungIT commented on the issue:

    https://github.com/apache/phoenix/pull/171
  
    Pull looks good, but please squash all commits into one and prefix the commit message with the JIRA number so we tie the two together: PHOENIX-2405
    Improve performance and stability of server side sort for ORDER BY


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65293621
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/DeferredByteBufferSegmentQueue.java ---
    @@ -0,0 +1,123 @@
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +
    +import java.io.*;
    +import java.util.*;
    +
    +public abstract class DeferredByteBufferSegmentQueue<T> extends BufferSegmentQueue<T> {
    +
    +    final MemoryChunk chunk;
    +
    +    public DeferredByteBufferSegmentQueue(int index, int thresholdBytes,
    +                                          boolean hasMaxQueueSize, MemoryManager memoryManager) {
    +        super(index, thresholdBytes, hasMaxQueueSize);
    +        chunk = memoryManager.allocate(thresholdBytes);
    +    }
    +
    +    abstract protected void writeToBuffer(OutputStream outputStream, T e);
    +    abstract protected T readFromBuffer(DataInput dataInput);
    --- End diff --
    
    I know they are the existing method names, but they might be even clearer if changed to "writeRecord(ToBuffer)" and "readRecord(FromBuffer)"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65291859
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---
    @@ -234,6 +234,12 @@ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocesso
                         scan.getAttribute(QueryConstants.LAST_SCAN) != null);
             }
             final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
    +        
    +        if(iterator != null){
    +            iterator.setMemoryManager(GlobalCache.getTenantCache(c.getEnvironment(), tenantId).getMemoryManager());
    +        }
    +
    +
    --- End diff --
    
    Is it possible to put this MemoryManager into constructor other than using the set method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65293343
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/DeferredByteBufferSegmentQueue.java ---
    @@ -0,0 +1,123 @@
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +
    +import java.io.*;
    +import java.util.*;
    +
    +public abstract class DeferredByteBufferSegmentQueue<T> extends BufferSegmentQueue<T> {
    +
    +    final MemoryChunk chunk;
    +
    +    public DeferredByteBufferSegmentQueue(int index, int thresholdBytes,
    +                                          boolean hasMaxQueueSize, MemoryManager memoryManager) {
    +        super(index, thresholdBytes, hasMaxQueueSize);
    +        chunk = memoryManager.allocate(thresholdBytes);
    +    }
    +
    +    abstract protected void writeToBuffer(OutputStream outputStream, T e);
    +    abstract protected T readFromBuffer(DataInput dataInput);
    +
    +
    +    @Override
    +    protected SegmentQueueFileIterator createSegmentQueueFileIterator(SegmentQueueFileIterator iterator){
    +        return new DeferredSegmentQueueFileIterator(iterator);
    +    }
    +
    +    @Override
    +    protected SegmentQueueFileIterator createSegmentQueueFileIterator(){
    +        return new DeferredSegmentQueueFileIterator();
    +    }
    +
    +    @Override
    +    protected void flush(T entry) throws IOException {
    +        Queue<T> inMemQueue = getInMemoryQueue();
    +        int resultSize = sizeOf(entry);
    +        maxResultSize = Math.max(maxResultSize, resultSize);
    +        totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
    +        if (totalResultSize >= thresholdBytes) {
    +            this.file = File.createTempFile(UUID.randomUUID().toString(), null);
    +
    +            DeferredFileOutputStream spoolTo = new DeferredFileOutputStream(thresholdBytes, file) {
    +                @Override
    +                protected void thresholdReached() throws IOException {
    +                    try {
    +                        super.thresholdReached();
    +                    } finally {
    +                            chunk.close();
    +                    }
    +                }
    +            };
    +
    +            int resSize = inMemQueue.size();
    +            for (int i = 0; i < resSize; i++) {
    +                writeToBuffer(spoolTo, inMemQueue.poll());
    +            }
    +
    +            spoolTo.write(EOF); // end
    +            spoolTo.flush();
    +            flushedCount = resSize;
    +            inMemQueue.clear();
    +            flushBuffer = true;
    +        }
    +    }
    +
    +    private class DeferredSegmentQueueFileIterator extends SegmentQueueFileIterator {
    +        private DataInputStream dataInput;
    +
    +        public DeferredSegmentQueueFileIterator() {
    +            super();
    +        }
    +
    +        public DeferredSegmentQueueFileIterator(SegmentQueueFileIterator iterator) {
    +            super(iterator);
    +        }
    +
    +
    +
    +        @Override
    +        protected void init(long readIndex) {
    +            this.isEnd = false;
    +            this.readIndex = readIndex;
    +            this.next = null;
    +
    +            try {
    +                BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
    +                bufferedInputStream.skip(readIndex);
    +                this.dataInput = new DataInputStream(bufferedInputStream);
    +            } catch (IOException e) {
    --- End diff --
    
    Like I said in the above comment, you are not using DeferredFileOutputStream as a deferred stream, you are just using it as an ordinary FileOutputStream. So maybe forget about handling the DeferredFileOutputStream yourself and use the adapted SpoolingResultIterator instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #171: Phoenix-2405

Posted by RCheungIT <gi...@git.apache.org>.
Github user RCheungIT closed the pull request at:

    https://github.com/apache/phoenix/pull/171


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65294499
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---
    @@ -572,11 +569,11 @@ public MappedByteBufferTupleQueue(int thresholdBytes) {
             }
     
             @Override
    -        protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() {
    -            return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() {
    +        protected Comparator<BufferSegmentQueue<Tuple>> getSegmentQueueComparator() {
    +            return new Comparator<BufferSegmentQueue<Tuple>>() {
                     @Override
    -                public int compare(MappedByteBufferSegmentQueue<Tuple> q1, 
    -                        MappedByteBufferSegmentQueue<Tuple> q2) {
    +                public int compare(BufferSegmentQueue<Tuple> q1,
    +                                   BufferSegmentQueue<Tuple> q2) {
                         return q1.index() - q2.index();
    --- End diff --
    
    Think we should eventually get rid of MappedByteBufferSegmentQueue and used the DeferredXXX version instead. Maybe that'll also simply the level of abstraction here. Go with the other changes first and see what you can do here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix issue #171: Phoenix-2405

Posted by RCheungIT <gi...@git.apache.org>.
Github user RCheungIT commented on the issue:

    https://github.com/apache/phoenix/pull/171
  
    Once you get this completed, it'd be good to compare perf of old vs new. Maybe Mujtaba can help with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #171: Phoenix-2405

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65807620
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/DeferredByteBufferSegmentQueue.java ---
    @@ -0,0 +1,123 @@
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +
    +import java.io.*;
    +import java.util.*;
    +
    +public abstract class DeferredByteBufferSegmentQueue<T> extends BufferSegmentQueue<T> {
    +
    +    final MemoryChunk chunk;
    +
    +    public DeferredByteBufferSegmentQueue(int index, int thresholdBytes,
    +                                          boolean hasMaxQueueSize, MemoryManager memoryManager) {
    +        super(index, thresholdBytes, hasMaxQueueSize);
    +        chunk = memoryManager.allocate(thresholdBytes);
    --- End diff --
    
    FYI, SpoolingResultIterator is deprecated, so no need to refactor. Feel free to copy/paste code from there, though, as conceptually it's what you need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request: Phoenix-2405

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/171#discussion_r65293136
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/DeferredByteBufferSegmentQueue.java ---
    @@ -0,0 +1,123 @@
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +
    +import java.io.*;
    +import java.util.*;
    +
    +public abstract class DeferredByteBufferSegmentQueue<T> extends BufferSegmentQueue<T> {
    +
    +    final MemoryChunk chunk;
    +
    +    public DeferredByteBufferSegmentQueue(int index, int thresholdBytes,
    +                                          boolean hasMaxQueueSize, MemoryManager memoryManager) {
    +        super(index, thresholdBytes, hasMaxQueueSize);
    +        chunk = memoryManager.allocate(thresholdBytes);
    --- End diff --
    
    "thresholdBytes" might be confusing here. There are actually two occurrences of memory usage here, first one being in-memory priority queue for sorting, once that part, the size of which is rather an estimate (based on the priority queue data structure) than an actual value, has reached the threshold, the priority queue content should be written to a some kind of file OutputStream, which is now DeferredFileOutputStream. The second memory usage is that used by DeferredFileOutputStream itself, since its content will first stay in memory before its own threshold is reached.
    Therefore, we might need to allocate twice (it's not real allocate anyway, it's for tracking memory usage actually). But a better way to do this is to make use of SpoolingResultIterator logic to handle the entire second part as mentioned above. They should be exactly the same logic except that SpoolingResultIterator writes and reads Tuples and what you need here is something that writes and reads ResultEntry. So see if you can apply some abstraction here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---