You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by RCheungIT <gi...@git.apache.org> on 2016/06/19 19:36:12 UTC

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

GitHub user RCheungIT opened a pull request:

    https://github.com/apache/phoenix/pull/175

    PHOENIX-2405 Not for merge, just request a review to check whether I'm on the right way

    https://issues.apache.org/jira/browse/PHOENIX-2405

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405-HBase-1.2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/phoenix/pull/175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #175
    
----
commit 6566ccd5fa4e8fbfa4fea4e743ca56143d17bc48
Author: RCheungIT <hu...@outlook.com>
Date:   2016-05-27T17:12:41Z

    PHOENIX-2405 Improve performance and stability of server side sort for
    ORDER BY

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by RCheungIT <gi...@git.apache.org>.
Github user RCheungIT commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69728427
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java ---
    @@ -0,0 +1,357 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.input.CountingInputStream;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.query.QueryServicesOptions;
    +
    +
    +import java.io.*;
    +import java.util.AbstractQueue;
    +import java.util.Iterator;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
    +
    +
    +public abstract class SpoolingByteBufferSegmentQueue<T>  extends AbstractQueue<T> {
    --- End diff --
    
    Hi @maryannxue, I think I don't get your idea here.  If the modify of SpoolingResultIterator is done, where the modified SpoolingResultIterator should be? What I did here, is modify the SpoolingResultIterator as SpoolingByteBufferSegmentQueue,  so the SpoolingByteBufferSegmentQueue deals with the priority queue logic and the deferred byte buffer logic in the same time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by RCheungIT <gi...@git.apache.org>.
Github user RCheungIT commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69792221
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java ---
    @@ -0,0 +1,357 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.input.CountingInputStream;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.query.QueryServicesOptions;
    +
    +
    +import java.io.*;
    +import java.util.AbstractQueue;
    +import java.util.Iterator;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
    +
    +
    +public abstract class SpoolingByteBufferSegmentQueue<T>  extends AbstractQueue<T> {
    --- End diff --
    
    Hi @maryannxue, it seems that I get your idea now. Do you mean I should not change the sort logic in memory?  What I only need to do is modify the flush logic, flush the sorted result to SpoolingResultIterator. Is that correct? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69667743
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java ---
    @@ -0,0 +1,357 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.input.CountingInputStream;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.query.QueryServicesOptions;
    +
    +
    +import java.io.*;
    +import java.util.AbstractQueue;
    +import java.util.Iterator;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
    +
    +
    +public abstract class SpoolingByteBufferSegmentQueue<T>  extends AbstractQueue<T> {
    --- End diff --
    
    The logic is very confusing here. My idea was to extend or modify the current SpoolingResultIterator so that it can take a ResultEntry and/or a Tuple as a record. But meanwhile this does not have to do with the XXXQueue here. XXXQueue deals with the priority queue logic and SpoolingXXX deals with the deferred byte buffer logic. Let me know whether you understand how it's supposed to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69668528
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---
    @@ -133,7 +134,10 @@ public PeekingResultIterator newIterator(StatementContext context, ResultIterato
                 Expression expression = RowKeyExpression.INSTANCE;
                 OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
                 int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -            return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold);
    +            String spoolDirectory = services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY);
    --- End diff --
    
    In my previous review I might not have made it very clear that the idea of moving MemoryManager into constructors was more of a question rather than advice. Since I hadn't studied the code so carefully, I was trying to ask you if that would be better, or worse? Looks like these parameters are getting all over the place and my suggestion was not a good one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69668752
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---
    @@ -45,10 +45,7 @@
     import org.apache.phoenix.exception.SQLExceptionInfo;
     import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
     import org.apache.phoenix.expression.Expression;
    -import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
    -import org.apache.phoenix.iterate.MappedByteBufferQueue;
    -import org.apache.phoenix.iterate.ParallelScanGrouper;
    -import org.apache.phoenix.iterate.ResultIterator;
    +import org.apache.phoenix.iterate.*;
    --- End diff --
    
    Please do always check your patch before you submit. This is not the right coding style, and such changes should never appear in a patch/pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69668023
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java ---
    @@ -0,0 +1,357 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.input.CountingInputStream;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.query.QueryServicesOptions;
    +
    +
    +import java.io.*;
    +import java.util.AbstractQueue;
    +import java.util.Iterator;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
    +
    +
    +public abstract class SpoolingByteBufferSegmentQueue<T>  extends AbstractQueue<T> {
    +
    +    private ResultQueue<T> spoolFrom;
    +
    +    private boolean closed ;
    +    private boolean flushed;
    +    private DeferredFileOutputStream spoolTo;
    +    private MemoryChunk chunk;
    +    private int size = 0;
    +    private long inMemByteSize = 0L;
    +    private int index;
    +
    +
    +
    +    SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory)  {
    +
    +        long startTime = System.currentTimeMillis();
    +        chunk  = mm.allocate(0, thresholdBytes);
    +        long waitTime = System.currentTimeMillis() - startTime;
    +        GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
    +
    +        int size = (int)chunk.getSize();
    +        spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) {
    +            @Override
    +            protected void thresholdReached() throws IOException {
    +                try {
    +                    super.thresholdReached();
    +                } finally {
    +                    chunk.close();
    +                }
    +            }
    +        };
    +
    +
    +    }
    +
    +    public int index() {
    +        return this.index;
    +    }
    +
    +
    +
    +    protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk);
    +
    +    protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file);
    +
    +    @Override
    +    public boolean offer(T t) {
    +        if (closed || flushed){
    +            return false;
    +        }
    +        boolean result = writeRecord(t, spoolTo);
    +        if(result){
    +            if(!spoolTo.isInMemory()){
    +                flushToDisk();
    +            }
    +            size++;
    +        }
    +
    +
    +        return result;
    +    }
    +
    +    protected abstract boolean writeRecord(T t, OutputStream outputStream);
    +
    +    private void flushToMemory(){
    +        byte[] data = spoolTo.getData();
    +        chunk.resize(data.length);
    +        spoolFrom = createInMemoryResultQueue(data, chunk);
    +        GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
    +        flushed = true;
    +    }
    +
    +
    +    private void flushToDisk(){
    +        long sizeOfSpoolFile = spoolTo.getFile().length();
    +        GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
    +        GLOBAL_SPOOL_FILE_COUNTER.increment();
    +        spoolFrom = createOnDiskResultQueue(spoolTo.getFile());
    +        if (spoolTo.getFile() != null) {
    +            spoolTo.getFile().deleteOnExit();
    +        }
    +        inMemByteSize = 0;
    +        flushed = true;
    +    }
    +
    +
    +    public boolean isFlushed(){
    +        return flushed;
    +    }
    +
    +    public T peek() {
    +        if(!flushed){
    +            flushToMemory();
    +        }
    +        return spoolFrom.peek();
    +    }
    +
    +    @Override
    +    public T poll() {
    +        if(!flushed){
    +            flushToMemory();
    +        }
    +        return spoolFrom.poll();
    +    }
    +
    +    public void close() throws IOException {
    +        if(spoolFrom != null){
    +            spoolFrom.close();
    +        }
    +    }
    +
    +    @Override
    +    public Iterator<T> iterator() {
    +        if(!flushed){
    +            flushToMemory();
    +        }
    +        return spoolFrom.iterator();
    +    }
    +
    +    @Override
    +    public int size() {
    +        return size ;
    +    }
    +
    +    public long getInMemByteSize(){
    +        return inMemByteSize;
    +    };
    +
    +    private static abstract class ResultQueue<T> extends  AbstractQueue<T> implements  Closeable{}
    +
    +    protected static abstract class InMemoryResultQueue<T> extends ResultQueue<T> {
    +        private final MemoryChunk memoryChunk;
    +        protected final byte[] bytes;
    +        private T next;
    +        private AtomicInteger offset = new AtomicInteger(0);
    +
    +        protected InMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk) {
    +            this.bytes = bytes;
    +            this.memoryChunk = memoryChunk;
    +            advance(offset);
    +        }
    +
    +        protected abstract T advance(AtomicInteger offset);
    +
    +        @Override
    +        public boolean offer(T t) {
    +            return false;
    +        }
    +
    +        @Override
    +        public T peek(){
    +            return next;
    +        }
    +
    +        @Override
    +        public T poll() {
    +            T current = next;
    +            next = advance(offset);
    +            return current;
    +        }
    +
    +
    +        public void close() {
    +            memoryChunk.close();
    +        }
    +
    +
    +        @Override
    +        public Iterator<T> iterator() {
    +            return new Iterator<T>(){
    +                AtomicInteger iteratorOffset = new AtomicInteger(offset.get());
    +                private T next = advance(iteratorOffset);
    --- End diff --
    
    One reminder: I said in my previous review that we'd better avoid discarding a old XXXSegmentQueue and starting a new one all the time when we switch between these segment queues. So we should ultimately remove this offset thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] phoenix pull request #175: PHOENIX-2405 Not for merge, just request a review...

Posted by maryannxue <gi...@git.apache.org>.
Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/175#discussion_r69668052
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java ---
    @@ -0,0 +1,357 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import org.apache.commons.io.input.CountingInputStream;
    +import org.apache.commons.io.output.DeferredFileOutputStream;
    +
    +import org.apache.phoenix.memory.MemoryManager;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.query.QueryServices;
    +import org.apache.phoenix.query.QueryServicesOptions;
    +
    +
    +import java.io.*;
    +import java.util.AbstractQueue;
    +import java.util.Iterator;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
    +
    +
    +public abstract class SpoolingByteBufferSegmentQueue<T>  extends AbstractQueue<T> {
    +
    +    private ResultQueue<T> spoolFrom;
    +
    +    private boolean closed ;
    +    private boolean flushed;
    +    private DeferredFileOutputStream spoolTo;
    +    private MemoryChunk chunk;
    +    private int size = 0;
    +    private long inMemByteSize = 0L;
    +    private int index;
    +
    +
    +
    +    SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory)  {
    +
    +        long startTime = System.currentTimeMillis();
    +        chunk  = mm.allocate(0, thresholdBytes);
    +        long waitTime = System.currentTimeMillis() - startTime;
    +        GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
    +
    +        int size = (int)chunk.getSize();
    +        spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) {
    +            @Override
    +            protected void thresholdReached() throws IOException {
    +                try {
    +                    super.thresholdReached();
    +                } finally {
    +                    chunk.close();
    +                }
    +            }
    +        };
    +
    +
    +    }
    +
    +    public int index() {
    +        return this.index;
    +    }
    +
    +
    +
    +    protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk);
    +
    +    protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file);
    +
    +    @Override
    +    public boolean offer(T t) {
    +        if (closed || flushed){
    +            return false;
    +        }
    +        boolean result = writeRecord(t, spoolTo);
    +        if(result){
    --- End diff --
    
    I'm pretty lost at why this "if" block here, means we flush to disk all the time? I'll stop here though, leaving all other doubts, coz this all (XXXQueue classes) is gonna take a rewrite I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---