You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2020/03/28 00:20:15 UTC

[GitHub] [phoenix] dbwong commented on a change in pull request #746: PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed

dbwong commented on a change in pull request #746: PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed
URL: https://github.com/apache/phoenix/pull/746#discussion_r399591638
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
 ##########
 @@ -50,62 +51,80 @@ protected MutatingParallelIteratorFactory(PhoenixConnection connection) {
     /**
      * Method that does the actual mutation work
      */
-    abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+    abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator,
+            PhoenixConnection connection) throws SQLException;
     
     @Override
-    public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException {
+    public PeekingResultIterator newIterator(final StatementContext parentContext,
+            ResultIterator iterator, Scan scan, String tableName,
+            QueryPlan plan) throws SQLException {
+
         final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
-        
-        MutationState state = mutate(parentContext, iterator, clonedConnection);
-        
-        final long totalRowCount = state.getUpdateCount();
-        final boolean autoFlush = connection.getAutoCommit() || plan.getTableRef().getTable().isTransactional();
-        if (autoFlush) {
-            clonedConnection.getMutationState().join(state);
-            state = clonedConnection.getMutationState();
-        }
-        final MutationState finalState = state;
-        
-        byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
-        KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
-        final Tuple tuple = new SingleKeyValueTuple(keyValue);
-        return new PeekingResultIterator() {
-            private boolean done = false;
-            
-            @Override
-            public Tuple next() throws SQLException {
-                if (done) {
-                    return null;
-                }
-                done = true;
-                return tuple;
-            }
+        try {
+            MutationState state = mutate(parentContext, iterator, clonedConnection);
 
-            @Override
-            public void explain(List<String> planSteps) {
+            final long totalRowCount = state.getUpdateCount();
+            final boolean autoFlush = connection.getAutoCommit() ||
+                    plan.getTableRef().getTable().isTransactional();
+            if (autoFlush) {
+                clonedConnection.getMutationState().join(state);
+                state = clonedConnection.getMutationState();
             }
+            final MutationState finalState = state;
+
+            byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
+            KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+                    SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+            final Tuple tuple = new SingleKeyValueTuple(keyValue);
+            return new PeekingResultIterator() {
+                private boolean done = false;
 
-            @Override
-            public void close() throws SQLException {
-                try {
-                    /* 
-                     * Join the child mutation states in close, since this is called in a single threaded manner
-                     * after the parallel results have been processed. 
-                     * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation 
-                     * state (with no mutations). However, it still has the metrics for mutation work done by the 
-                     * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
-                     * to the parent connection.
-                     */ 
-                    MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
-                } finally {
-                    clonedConnection.close();
+                @Override
+                public Tuple next() {
+                    if (done) {
+                        return null;
+                    }
+                    done = true;
+                    return tuple;
                 }
-            }
 
-            @Override
-            public Tuple peek() throws SQLException {
-                return done ? null : tuple;
+                @Override
+                public void explain(List<String> planSteps) {
+                }
+
+                @Override
+                public void close() throws SQLException {
+                    try {
+                        /*
+                         * Join the child mutation states in close, since this is called in a single
+                         * threaded manner after the parallel results have been processed.
+                         * If auto-commit is on for the cloned child connection, then the finalState
+                         * here is an empty mutation state (with no mutations). However, it still
+                         * has the metrics for mutation work done by the mutating-iterator.
+                         * Joining the mutation state makes sure those metrics are passed over
+                         * to the parent connection.
+                         */
+                        MutatingParallelIteratorFactory.this.connection.getMutationState()
+                                .join(finalState);
+                    } finally {
+                        clonedConnection.close();
+                    }
+                }
+
+                @Override
+                public Tuple peek() {
+                    return done ? null : tuple;
+                }
+            };
+        } catch (Throwable ex) {
+            // Catch just to make sure we close the cloned connection and then rethrow
+            try {
+                // closeQuietly only handles IOException
+                clonedConnection.close();
+            } catch (SQLException ignore) {
+                // ignore
 
 Review comment:
   Do you think we should log here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services