You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2019/10/22 18:30:20 UTC

[GitHub] [phoenix] BinShi-SecularBird commented on a change in pull request #603: PHOENIX-5478 IndexTool mapper task should not timeout

BinShi-SecularBird commented on a change in pull request #603: PHOENIX-5478 IndexTool mapper task should not timeout
URL: https://github.com/apache/phoenix/pull/603#discussion_r337681191
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 ##########
 @@ -1087,116 +1087,140 @@ private static PTable deserializeTable(byte[] b) {
             throw new RuntimeException(e);
         }
     }
-    
-    private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
-            Configuration config) throws IOException {
-        byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-        boolean useProto = true;
-        // for backward compatibility fall back to look up by the old attribute
-        if (indexMetaData == null) {
-            useProto = false;
-            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+    private class IndexRebuildRegionScanner extends BaseRegionScanner {
+        private long pageSizeInRows = Long.MAX_VALUE;
+        private int rowCount = 0;
+        private boolean hasMore;
+        private final int maxBatchSize;
+        private MutationList mutations;
+        private final long maxBatchSizeBytes;
+        private final long blockingMemstoreSize;
+        private final byte[] clientVersionBytes;
+        private List<Cell> results = new ArrayList<Cell>();
+        private byte[] indexMetaData;
+        private boolean useProto = true;
+        private Scan scan;
+        private RegionScanner innerScanner;
+        final Region region;
+
+        IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+                                   final Configuration config) {
+            super(innerScanner);
+            if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+                pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+            }
+
+            maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            mutations = new MutationList(maxBatchSize);
+            maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            blockingMemstoreSize = getBlockingMemstoreSize(region, config);
+            clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+            if (indexMetaData == null) {
+                useProto = false;
+                indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            }
+            this.scan = scan;
+            this.innerScanner = innerScanner;
+            this.region = region;
         }
-        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
-        boolean hasMore;
-        int rowCount = 0;
-        try {
-            int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-            final long blockingMemstoreSize = getBlockingMemstoreSize(region, config);
-            MutationList mutations = new MutationList(maxBatchSize);
-            region.startRegionOperation();
-            byte[] uuidValue = ServerCacheClient.generateId();
-            synchronized (innerScanner) {
-                do {
-                    List<Cell> results = new ArrayList<Cell>();
-                    hasMore = innerScanner.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        Put put = null;
-                        Delete del = null;
-                        for (Cell cell : results) {
-
-                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
-                                if (put == null) {
-                                    put = new Put(CellUtil.cloneRow(cell));
-                                    put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-                                    put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                        BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-                                    put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-                                    mutations.add(put);
-                                    // Since we're replaying existing mutations, it makes no sense to write them to the wal
-                                    put.setDurability(Durability.SKIP_WAL);
-                                }
-                                put.add(cell);
-                            } else {
-                                if (del == null) {
-                                    del = new Delete(CellUtil.cloneRow(cell));
-                                    del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-                                    del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                        BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-                                    del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-                                    mutations.add(del);
-                                    // Since we're replaying existing mutations, it makes no sense to write them to the wal
-                                    del.setDurability(Durability.SKIP_WAL);
+        @Override
+        public RegionInfo getRegionInfo() {
+            return region.getRegionInfo();
+        }
+
+        @Override
+        public boolean isFilterDone() { return hasMore; }
+
+        @Override
+        public void close() throws IOException { innerScanner.close(); }
+
+        @Override
+        public boolean next(List<Cell> results) throws IOException {
+            try {
+                byte[] uuidValue = ServerCacheClient.generateId();
+                synchronized (this) {
 
 Review comment:
   Should this be synchronized(this.innerScanner)? It seems that doPostScannerOpen(), which calls RebuildIndices(), requires to synchronize the access to innerScanner object. By using "synchronized (this)", you synchronize the access to IndexRebuildRegionScanner object but it doesn't prevent this.innerScanner object being accessed in parallel.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services