You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by geraldss <gi...@git.apache.org> on 2018/07/02 20:45:46 UTC

[GitHub] phoenix pull request #308: Client-side hash aggregation

GitHub user geraldss opened a pull request:

    https://github.com/apache/phoenix/pull/308

    Client-side hash aggregation

    Client-side hash aggregation for use with sort-merge join.
    
    Implements https://issues.apache.org/jira/browse/PHOENIX-4751


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/geraldss/phoenix master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/phoenix/pull/308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #308
    
----
commit c8acc6cb39e222a5206c79566552c5c27cbe27f1
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-14T19:49:30Z

    PHOENIX-4751 Add HASH_AGGREGATE hint

commit a261b3f94f753b4a8d6baaad6168e76f97d76bb6
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-16T04:17:32Z

    PHOENIX-4751 Begin implementation of client hash aggregation

commit 863d24e34a83282f90d5d2db05522b678dfced74
Author: Rajeshbabu Chintaguntla <ra...@...>
Date:   2018-06-15T22:38:44Z

    PHOENIX-4786 Reduce log level to debug when logging new aggregate row key found and added results for scan ordered queries(Rajeshbabu)

commit cfae7ddcfa5b58a367cd0c57c23f394ceb9f1259
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-16T04:55:00Z

    Merge remote-tracking branch 'upstream/master'

commit 1f453308a24be49a8036292671d51eb25137d680
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-20T17:47:34Z

    PHOENIX-4751 Generated aggregated results

commit 66aaacfd989c63e18fb9a5c5b9e133519ab93507
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-24T23:18:14Z

    PHOENIX-4751 Sort results of client hash aggregation

commit a6c2b7ce738710cfdffc1e9e4d1d234d2090a225
Author: James Taylor <ja...@...>
Date:   2018-06-18T13:00:02Z

    PHOENIX-4789 Exception when setting TTL on Tephra transactional table

commit fba4196fcace83d4e42e902d2cb6295bb519ed39
Author: Ankit Singhal <an...@...>
Date:   2018-06-21T23:11:02Z

    PHOENIX-4785 Unable to write to table if index is made active during retry

commit 05de081b386c502b6c90ff24357ed7dbbc6dedd2
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-29T05:01:55Z

    PHOENIX-4751 Add integration test for client hash aggregation

commit b7960d0daedc6ce3c2fbcf0794e4a95639d7ba3c
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-30T00:03:59Z

    PHOENIX-4751 Fix and run integration tests for query results

commit a3629ac64b90c117f5caceddbb45fb9dc14649b8
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-30T06:22:43Z

    PHOENIX-4751 Add integration test for EXPLAIN

commit 3aa85d5c04309f6e0c5167c002e9dcb6091ea757
Author: Gerald Sangudi <gs...@...>
Date:   2018-06-30T17:13:17Z

    PHOENIX-4751 Verify EXPLAIN plan for both salted and unsalted

----


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    Looks like the move of 5.x branch to become master messed up this PR. Can you please start a new pull request against the 4.x-HBase-1.4 branch (this is what was master before), @geraldss? FYI, the last step will be to get a test run with your patch in place to make sure there are no regressions. To do that, attach a .patch file to the JIRA (include the branch name in the name of the patch file) and click on the Submit Patch button. See http://phoenix.apache.org/contributing.html#Generate_a_patch for more info on that.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205296403
  
    --- Diff: phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.end2end;
    +
    +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.Statement;
    +import java.util.Properties;
    +
    +import org.apache.phoenix.util.PropertiesUtil;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.junit.Test;
    +
    +public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
    --- End diff --
    
    The current tests cover when forward sort is required. My comment below addresses reverse sort.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    Hi @JamesRTaylor, I pushed another change and replied with some comments. Please review, thanks.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by solzy <gi...@git.apache.org>.
Github user solzy commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    *@JamesRTaylor * Thanks for your clarify.
    
    
    ----------------------------------------
       Yun Zhang
       Best regards!
    
    
    2018-07-17 6:18 GMT+08:00 James Taylor <no...@github.com>:
    
    > *@JamesRTaylor* commented on this pull request.
    > ------------------------------
    >
    > In phoenix-core/src/main/java/org/apache/phoenix/iterate/
    > ClientHashAggregatingResultIterator.java
    > <https://github.com/apache/phoenix/pull/308#discussion_r202842452>:
    >
    > > +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    > +        try {
    > +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    > +            ptr.set(key.get(), key.getOffset(), key.getLength());
    > +            return ptr;
    > +        } catch (IOException e) {
    > +            throw new SQLException(e);
    > +        }
    > +    }
    > +
    > +    // Copied from ClientGroupedAggregatingResultIterator
    > +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    > +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    > +    }
    > +
    > +    private void populateHash() throws SQLException {
    >
    > @geraldss <https://github.com/geraldss> - memory management is tracked by
    > the GlobalMemoryManager. Operations that potentially use memory allocate
    > (and eventually free) a set of MemoryChunk instances. You can see an
    > example of this in GroupedAggregateRegionObserver (the runtime code for
    > aggregation). If the memory used goes over a threshold (phoenix.query.maxGlobalMemoryPercentage
    > and phoenix.query.maxTenantMemoryPercentage as the allowed percentage of
    > Java heap across all queries that is allowed to be used), then the query
    > will fail. Most typically, this mechanism is used on the server side as we
    > don't typically use a lot of memory on the client-side (as we're mostly
    > doing merge joins). One example where we use this on the client side is for
    > our broadcast join implementation (see HashCacheClient) to track memory
    > held onto for Hash Join caches.
    >
    > Classes you may want to look at (or perhaps you already have?):
    > OrderedResultIterator and MappedByteBufferSortedQueue. Above a certain
    > configurable threshold (phoenix.query.spoolThresholdBytes defaults to
    > 20MB), we output results into memory mapped files. Have you tried
    > decreasing that threshold?
    >
    > Couple of JIRAs you may want to take a look at: PHOENIX-2405 (unclear if
    > this is still an issue) and PHOENIX-3289. Are you running into issues with
    > too many memory mapped files?
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/phoenix/pull/308#discussion_r202842452>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AD1phzirsKewFaxyFttqCr5ybJNLnvWdks5uHREqgaJpZM4U_6wx>
    > .
    >



---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @JamesRTaylor -- ok, I will submit a new PR. Can you comment on my last push and comments?


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @JamesRTaylor - I made the changes and they are ready for review. Thanks.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204197328
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions);
    +                } else {
    +                    iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +                    aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +                }
                 }
    -            aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getKeyExpressions());
                 aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             }
     
    --- End diff --
    
    In the below if statement, you should still insert the OrderedAggregatingResultIterator if a hash aggregation is being done and orderBy != OrderBy.EMPTY_ORDER_BY:
    
        // Still sort the aggregated rows if we're using a hash aggregation and the order by was optimized out
        // since the rows won't be in GROUP BY key order
        if (orderBy.getOrderByExpressions().isEmpty() && (!useHashAgg || orderBy != OrderBy.EMPTY_ORDER_BY)) {


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss closed the pull request at:

    https://github.com/apache/phoenix/pull/308


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204193333
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    --- End diff --
    
    I tried avoiding the sort, but could not make the code work without it. In my use case, if there's an ORDER BY after the groups, and the ORDER BY was by primary key, it was not being applied. If the ORDER BY was not by primary key, it was being applied.
    
    I left this sort in because (1) it introduces no error; (2) the sorting is after the grouping, so more efficient than the current solution, which sorts before the grouping; (3) it avoids the problem above.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203099292
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    Hi @JamesRTaylor, thanks for the review and feedback. I'm hoping to get a minimum implementation merged into master, and it can then improve over time. I see that HashCacheClient throws an exception after the memory threshold is reached. Would it suffice if I do the same thing?
    
    The other examples, e.g. MappedByteBufferSortedQueue, involve linear memory.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @JamesRTaylor - Done. I switched the PR to 4.x-HBase-1.4, attached the .patch file, and issued "Submit Patch".


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    +1. Nice work, @geraldss. @twdsilva - would you (or maybe someone else?) have some spare cycles to check this in assuming the test run comes back clean?


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203926179
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions, int thresholdBytes) {
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +        int hashSize = 0;
    +
    +        for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
    +            ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
    +            key = getGroupingKey(result, key);
    +            Aggregator[] rowAggregators = hash.get(key);
    +            if (rowAggregators == null) {
    +                // Abort if we exceed memory threshold/2
    +                // We use threshold/2 to leave room for the subsequent sort
    +                if (thresholdBytes > 0) {
    +                    hashSize += key.getSize() + aggSize;
    --- End diff --
    
    Done.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204176238
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    --- End diff --
    
    I don't think this sort is required at all. You've already guaranteed that each row has a unique row key, so the subsequent GroupedAggregatingResultIterator should work fine (that expects duplicate rows to be adjacent and since every row is unique, that'll be the case). If there's an ORDER BY for the groups, the Phoenix will insert an ordering result iterator.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205206944
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions);
    +                } else {
    +                    iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +                    aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +                }
                 }
    -            aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getKeyExpressions());
                 aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             }
     
    --- End diff --
    
    Hi @JamesRTaylor - please review. The sort is now done only when necessary. Thanks.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r206295564
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
    +                    aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, sort);
    --- End diff --
    
    Please add comment about reverse sort and point to JIRA (create one if there's not already one). Might be best to just have the code here such that once reverse sort is supported, the hash aggregate will just work.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by solzy <gi...@git.apache.org>.
Github user solzy commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r202024941
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    For a large result set of join, this code snippet has a risk lead to OOM on client. Maybe add limitation is  more graceful  


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    Thanks @JamesRTaylor for the reviews and suggestions.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @JamesRTaylor - pushed another change per your feedback. Fingers crossed :) Thanks.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204167291
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        memoryChunk.resize(0);
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +
    +        for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
    +            ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
    +            key = getGroupingKey(result, key);
    +            Aggregator[] rowAggregators = hash.get(key);
    +            if (rowAggregators == null) {
    +                long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, key.getSize(), aggSize);
    --- End diff --
    
    Looking at sizeOfMap, and it assumes key size is the same for all rows (which won't be true in this case). Instead, just pass in SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE here instead of key.getSize() and have a separate variable for keySize that accumulates each key.getSize() that gets added here.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204192752
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        memoryChunk.resize(0);
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +
    +        for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
    +            ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
    +            key = getGroupingKey(result, key);
    +            Aggregator[] rowAggregators = hash.get(key);
    +            if (rowAggregators == null) {
    +                long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, key.getSize(), aggSize);
    --- End diff --
    
    Done.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204197232
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    --- End diff --
    
    @JamesRTaylor - I tried that as well, as part off trying to avoid this sort. That is, I tried catching this special case in ClientAggregatePlan and wrapping an OrderedAggregatingResultIterator there. It did not work, which is why I relented and implemented this sort this way.
    
    One other detail -- for the special case, this sort is faster than the generic sort (it's less general). Bottom line, I have been unable to make the patch meet correctness without this sort.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by twdsilva <gi...@git.apache.org>.
Github user twdsilva commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @geraldss 
    The patch has been committed, can you close the PR ?


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203926229
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    --- End diff --
    
    Done.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204192760
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        memoryChunk.resize(0);
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +
    +        for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
    +            ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
    +            key = getGroupingKey(result, key);
    +            Aggregator[] rowAggregators = hash.get(key);
    +            if (rowAggregators == null) {
    +                long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, key.getSize(), aggSize);
    +                if (hashSize > memoryChunk.getSize() + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE) {
    +                    // This will throw InsufficientMemoryException if necessary
    +                    memoryChunk.resize(hashSize + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +                }
    +
    +                // Abort if we exceed memory threshold/2
    +                // We use threshold/2 to leave room for the subsequent sort
    +                if (thresholdBytes > 0) {
    +                    if (hashSize > thresholdBytes/2) {
    --- End diff --
    
    Done, removed.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r201562154
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    --- End diff --
    
    Done.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @JamesRTaylor - Handled reverse sort; added explanatory comments about sorting; added tests for sorting and non-sorting, including EXPLAIN.
    
    Pushed changes; uploaded new .patch file; resubmitted patch.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204495020
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions);
    +                } else {
    +                    iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +                    aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +                }
                 }
    -            aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getKeyExpressions());
                 aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             }
     
    --- End diff --
    
    @JamesRTaylor - Done. Please review. Thanks.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @joshelser Thanks for the review. I can make these changes now and push them. Or should I wait for any other reviewer?


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205230186
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
    --- End diff --
    
    Should be true if OrderBy.RVS_ROW_KEY_ORDER_BY too.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r201562225
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    --- End diff --
    
    Made the side effects more explicit.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205297065
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
    +                    aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, sort);
    --- End diff --
    
    Same as for previous comment. The GROUP BY cannot produce a reverse sort.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203868079
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions, int thresholdBytes) {
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +        int hashSize = 0;
    --- End diff --
    
    Instead of just tracking memory and here independently of other queries also running in this JVM, use our memory manager. This will also automatically track per tenant memory (which may have lower constraints than the overall allowed memory). Assuming you pass through StatementContext in the constructor, you can do something like this:
    
        MemoryChunk memoryChunk = context.getConnection().getQueryServices().getMemoryManager()
         .allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204168576
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        memoryChunk.resize(0);
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +
    +        for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
    +            ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
    +            key = getGroupingKey(result, key);
    +            Aggregator[] rowAggregators = hash.get(key);
    +            if (rowAggregators == null) {
    +                long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, key.getSize(), aggSize);
    +                if (hashSize > memoryChunk.getSize() + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE) {
    +                    // This will throw InsufficientMemoryException if necessary
    +                    memoryChunk.resize(hashSize + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +                }
    +
    +                // Abort if we exceed memory threshold/2
    +                // We use threshold/2 to leave room for the subsequent sort
    +                if (thresholdBytes > 0) {
    +                    if (hashSize > thresholdBytes/2) {
    --- End diff --
    
    I don't think we need this since we're tracking memory in a different way now. This would put a constraint on each client side aggregation (rather than memory usage as a whole). Do you think that additional constraint is necessary and if so why? FYI, you can also set an absolute byte amount for memory usage (but this is mainly used in tests). If you want to keep this, then let's create a different config parameter in QueryServices and a default in QueryServiesOptions. The spooling threshold is not a memory constraint. It's tells Phoenix when to start spooling to disk during a client sort, so it's not really appropriate to use here.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204161104
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        memoryChunk.resize(0);
    --- End diff --
    
    Instead of the memoryChunk.resize(0), replace with memoryChunk.close(). Best to do other close calls in nested finally blocks.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203926206
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions, int thresholdBytes) {
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +        int hashSize = 0;
    --- End diff --
    
    Done.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r206295347
  
    --- Diff: phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.end2end;
    +
    +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.Statement;
    +import java.util.Properties;
    +
    +import org.apache.phoenix.util.PropertiesUtil;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.junit.Test;
    +
    +public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
    --- End diff --
    
    Unless I'm missing something, this needs a few more tests:
    * verify CLIENT SORTED BY is present in explain plan when sort required for hash aggregate
    * verify it's not in explain plan when sort not required
    * verify query results when sort not required


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205296815
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
    --- End diff --
    
    The GROUP BY cannot cause REV_ROW_KEY_ORDER_BY, because the GROUP BY cannot specific or produce descending keys. This is a pre-existing assumption + design in SORT_MERGE_JOIN. The SORT_MERGE_JOIN creates its own forward sort, and its Tracker reports a forward sort.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203882988
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashSizeException.java ---
    @@ -0,0 +1,33 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.phoenix.iterate;
    +
    +/**
    + * Thrown by {@link org.apache.phoenix.iterate.ClientHashAggregatingResultIterator } when
    + * hash size exceeds memory threshold.
    + * 
    + */
    +public class ClientHashSizeException extends RuntimeException {
    --- End diff --
    
    You won't need this as an InsufficientMemoryException will be thrown if you go above the specified memory limit (based on existing Phoenix config properties) and this will be unwound to become a SQLException with the code SQLExceptionCode.INSUFFICIENT_MEMORY.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r201465583
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    --- End diff --
    
    Could consolidate this down into `Objects.requireNonNull(resultIterator)` from java.util.Objects.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r202112790
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    Hi @solzy , thanks for the review. My understanding of the pre-existing code is that classes like OrderedResultIterator do not handle OOM, and they do not provide a limitation. I don't want to introduce a mechanism that is specific to this feature. Is there any place where the client handles OOM, which can be used as a reference?


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r202842452
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    @geraldss - memory management is tracked by the GlobalMemoryManager. Operations that potentially use memory allocate (and eventually free) a set of MemoryChunk instances. You can see an example of this in GroupedAggregateRegionObserver (the runtime code for aggregation). If the memory used goes over a threshold (phoenix.query.maxGlobalMemoryPercentage and phoenix.query.maxTenantMemoryPercentage as the allowed percentage of Java heap across all queries that is allowed to be used), then the query will fail. Most typically, this mechanism is used on the server side as we don't typically use a lot of memory on the client-side (as we're mostly doing merge joins). One example where we use this on the client side is for our broadcast join implementation (see HashCacheClient) to track memory held onto for Hash Join caches.
    
    Classes you may want to look at (or perhaps you already have?): OrderedResultIterator and MappedByteBufferSortedQueue. Above a certain configurable threshold (phoenix.query.spoolThresholdBytes defaults to 20MB), we output results into memory mapped files. Have you tried decreasing that threshold?
     
    Couple of JIRAs you may want to take a look at: PHOENIX-2405 (unclear if this is still an issue) and PHOENIX-3289. Are you running into issues with too many memory mapped files?


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203867850
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions, int thresholdBytes) {
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException {
    +        hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +        int hashSize = 0;
    +
    +        for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) {
    +            ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
    +            key = getGroupingKey(result, key);
    +            Aggregator[] rowAggregators = hash.get(key);
    +            if (rowAggregators == null) {
    +                // Abort if we exceed memory threshold/2
    +                // We use threshold/2 to leave room for the subsequent sort
    +                if (thresholdBytes > 0) {
    +                    hashSize += key.getSize() + aggSize;
    --- End diff --
    
    Track memory used as you're doing (but use SizedUtil.sizeOfMap() to account for overhead of HashMap), but then call memoryChunk.resize(hashSize) when it gets bigger than another CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE. The reason to do it this way is that if there are lots of other threads doing memory intensive queries, you'll get a failure earlier (before a potential OOM exception can happen which is not recoverable from)


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205229353
  
    --- Diff: phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.end2end;
    +
    +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.Statement;
    +import java.util.Properties;
    +
    +import org.apache.phoenix.util.PropertiesUtil;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.junit.Test;
    +
    +public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
    --- End diff --
    
    Add tests for when sort (forward & reverse) required for hash aggregate.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204192739
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        memoryChunk.resize(0);
    --- End diff --
    
    Done.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r204196968
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.compile.StatementContext;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.SizedUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private final MemoryChunk memoryChunk;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators,
    +                                               List<Expression> groupByExpressions, int thresholdBytes) {
    +
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +        memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    --- End diff --
    
    See my comment in ClientAggregatePlan.java. I believe you can detect this corner case there and not need this sort. It's a little weird to have a hash aggregation that still does a sort (but I get your point about it being better than doing the sort before).


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203879864
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    --- End diff --
    
    Pass through context here too to ClientGroupedAggregatingResultIterator as you'll need it to get the memory manager. 


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205232207
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -183,13 +198,15 @@ public ExplainPlan getExplainPlan() throws SQLException {
             if (where != null) {
                 planSteps.add("CLIENT FILTER BY " + where.toString());
             }
    -        if (!groupBy.isEmpty()) {
    -            if (!groupBy.isOrderPreserving()) {
    -                planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString());
    -            }
    +        if (groupBy.isEmpty()) {
    +            planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
    +        } else if (groupBy.isOrderPreserving()) {
                 planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
    +        } else if (useHashAgg) {
    +            planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
    --- End diff --
    
    Add  CLIENT SORTED BY line here if sorting required for hash aggregate.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by joshelser <gi...@git.apache.org>.
Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r201468535
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    --- End diff --
    
    A comment here would be nice to note the side-effect that `hash` and `keyList` are guaranteed to be non-null (and thus, the lack of the defensive null-checks below is OK).


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205296128
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -183,13 +198,15 @@ public ExplainPlan getExplainPlan() throws SQLException {
             if (where != null) {
                 planSteps.add("CLIENT FILTER BY " + where.toString());
             }
    -        if (!groupBy.isEmpty()) {
    -            if (!groupBy.isOrderPreserving()) {
    -                planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString());
    -            }
    +        if (groupBy.isEmpty()) {
    +            planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
    +        } else if (groupBy.isOrderPreserving()) {
                 planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
    +        } else if (useHashAgg) {
    +            planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
    --- End diff --
    
    Done.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r205231380
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---
    @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw
                 aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
                 aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
             } else {
    -            if (!groupBy.isOrderPreserving()) {
    -                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
    -                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
    -                List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            List<Expression> keyExpressions = groupBy.getKeyExpressions();
    +            if (groupBy.isOrderPreserving()) {
    +                aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions);
    +            } else {
    +                int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt
    +                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                     List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                     for (Expression keyExpression : keyExpressions) {
                         keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
                     }
    -                iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize());
    +
    +                if (useHashAgg) {
    +                    boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
    +                    aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, sort);
    --- End diff --
    
    You’ll need to pass through if forward or reverse scan. You might just pass through orderBy instead of the boolean. Better still would be to let the code below insert an Ordering result iterator so you wouldn’t  need the sort logic at all in your new iterator.


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by solzy <gi...@git.apache.org>.
Github user solzy commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r202593990
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    @geraldss Sorry for late reply.  i did not find some handle for OOM on the client, which will be affects stability  when uses as QueryServer. anyway, we can improve these features in the future. Thanks for your response , good job. 


---

[GitHub] phoenix pull request #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203467413
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) {
    +        if (resultIterator == null) throw new NullPointerException();
    +        if (aggregators == null) throw new NullPointerException();
    +        if (groupByExpressions == null) throw new NullPointerException();
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +    }
    +
    +    @Override
    +    public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            populateHash();
    +            sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +    public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +    
    +    @Override
    +    public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +    public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +    public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator=" 
    +            + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
    +    }
    +
    +    private void populateHash() throws SQLException {
    --- End diff --
    
    @JamesRTaylor @solzy I have added a memory limitation based on SPOOL_THRESHOLD_BYTES. It throws an exception similar to HashCacheClient. Please review. Thanks.


---

[GitHub] phoenix issue #308: Client-side hash aggregation

Posted by geraldss <gi...@git.apache.org>.
Github user geraldss commented on the issue:

    https://github.com/apache/phoenix/pull/308
  
    @twdsilva - PR closed now.


---