You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2017/09/13 23:23:30 UTC

[12/15] incubator-trafodion git commit: [TRAFODION-2733] Provide an improved memory quota assignment for big memory operators (BMO)

[TRAFODION-2733] Provide an improved memory quota assignment for big memory operators (BMO)

Added SplitBanceHelper.java. This file was deleted inadvertently


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/fd3275c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/fd3275c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/fd3275c8

Branch: refs/heads/master
Commit: fd3275c8ca90c5ce99690103b4711cbe1164b291
Parents: eece870
Author: selvaganesang <se...@esgyn.com>
Authored: Sat Sep 9 04:24:42 2017 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Sat Sep 9 04:24:42 2017 +0000

----------------------------------------------------------------------
 .../transactional/SplitBalanceHelper.java       | 394 +++++++++++++++++++
 1 file changed, 394 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/fd3275c8/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
new file mode 100644
index 0000000..989415b
--- /dev/null
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
@@ -0,0 +1,394 @@
+/**
+ * * @@@ START COPYRIGHT @@@
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements.  See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership.  The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License.  You may obtain a copy of the License at
+ * *
+ * *   http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied.  See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * *
+ * * @@@ END COPYRIGHT @@@
+ * **/
+
+package org.apache.hadoop.hbase.coprocessor.transactional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
+import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+public class SplitBalanceHelper {
+    private static final Log LOG = LogFactory.getLog(SplitBalanceHelper.class);
+
+    private Path flushPath;
+
+    private static String zkTable;
+    private static String zSplitBalPath = TrxRegionObserver.zTrafPath + "splitbalance/";
+    private static String zSplitBalPathNoSlash = TrxRegionObserver.zTrafPath + "splitbalance";
+    private static String SPLIT = "SPLIT";
+    private static String BALANCE = "BALANCE";
+    private static final String FLUSH_PATH = "traf.txn.out";
+    private static AtomicBoolean needsCleanup = new AtomicBoolean(true);
+    private String balancePath;
+    private String splitPath;
+    private String regionPath;
+
+    private ZooKeeperWatcher zkw;
+    private HRegionInfo hri;
+    private HRegion region;
+    private String tablename;
+
+    public SplitBalanceHelper(HRegion my_Region, ZooKeeperWatcher zkw, Configuration conf) {
+
+        String parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                                      HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+        SplitBalanceHelper.zkTable = parentZNode + "/table";
+        if(LOG.isDebugEnabled()) LOG.debug("zkTable value: " + SplitBalanceHelper.zkTable);
+
+        String fileName = FLUSH_PATH + getTimeStamp();
+        this.region = my_Region;
+        this.hri = my_Region.getRegionInfo();
+        this.zkw = zkw;
+        this.tablename = my_Region.getTableDesc().getNameAsString();
+        try {
+            if (ZKUtil.checkExists(zkw, zSplitBalPathNoSlash) == -1) {
+                if (LOG.isDebugEnabled()) LOG.debug("HELPER create with parents");
+                ZKUtil.createWithParents(zkw, zSplitBalPathNoSlash);
+            }
+        } catch (KeeperException ke) {
+            LOG.error("ERROR: Zookeeper exception: " + ke);
+        }
+        this.flushPath = new Path(region.getRegionFileSystem().getRegionDir(), fileName);
+        regionPath = zSplitBalPath + this.tablename + "/" + hri.getEncodedName();
+        balancePath = regionPath + "/" + BALANCE + "/";
+        splitPath = regionPath + "/" + SPLIT + "/";
+
+        if (SplitBalanceHelper.needsCleanup.compareAndSet(true, false)) {
+            zkCleanup();
+        }
+    }
+
+    public Path getPath() {
+        return flushPath;
+    }
+
+    public boolean getSplit() {
+        return getSplit(null);
+    }
+
+    public boolean getSplit(StringBuilder path) {
+        try {
+            byte[] splPath = ZKUtil.getData(zkw, splitPath.substring(0, splitPath.length() - 1));
+            if (splPath == null) {
+                return false;
+            } else {
+                if (path != null)
+                    path.append(splPath.toString());
+                if (LOG.isDebugEnabled()) LOG.debug("Split information retrieved, path is: " + splPath.toString());
+                return true;
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled()) LOG.error("Keeper exception: " + e);
+            return false;
+        }
+    }
+
+    public void setSplit(HRegion leftRegion, HRegion rightRegion) throws IOException {
+        String zLeftKey = zSplitBalPath + leftRegion.getRegionInfo().getEncodedName();
+        String zRightKey = zSplitBalPath + rightRegion.getRegionInfo().getEncodedName();
+
+        try {
+            if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length() - 1)) != -1) {
+                clearBalance();
+            }
+            if (LOG.isDebugEnabled()) LOG.debug("Split checking for left key ");
+            if (ZKUtil.checkExists(zkw, zLeftKey) == -1) {
+                if (LOG.isDebugEnabled()) LOG.debug("Split creating left key with parents");
+                ZKUtil.createWithParents(zkw, zLeftKey);
+            }
+            if (LOG.isDebugEnabled()) LOG.debug("Split checking for right key ");
+            if (ZKUtil.checkExists(zkw, zRightKey) == -1) {
+                if (LOG.isDebugEnabled()) LOG.debug("Split creating right key with parents");
+                ZKUtil.createWithParents(zkw, zRightKey);
+            }
+            if (LOG.isDebugEnabled()) LOG.debug("Split createAndFailSilent for left key ");
+            ZKUtil.createAndFailSilent(zkw, zLeftKey + "/" + SPLIT, Bytes.toBytes(flushPath.toString()));
+            if (LOG.isDebugEnabled()) LOG.debug("Split createAndFailSilent for right key ");
+            ZKUtil.createAndFailSilent(zkw, zRightKey + "/" + SPLIT, Bytes.toBytes(flushPath.toString()));
+            if (LOG.isDebugEnabled()) LOG.debug("Split coordination node written for " + leftRegion.getRegionInfo().getRegionNameAsString() + " and " + rightRegion.getRegionInfo().getRegionNameAsString());
+        } catch (KeeperException ke) {
+            LOG.error("ERROR: Zookeeper exception: " + ke);
+        }
+    }
+
+    public void setSplit() {
+
+        try {
+            if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length() - 1)) != -1) {
+                clearBalance();
+            }
+            if (ZKUtil.checkExists(zkw, splitPath.substring(0, splitPath.length() - 1)) == -1) {
+                ZKUtil.createWithParents(zkw, splitPath.substring(0, splitPath.length() - 1));
+            }
+            ZKUtil.createSetData(zkw, splitPath.substring(0, splitPath.length() - 1), Bytes.toBytes(flushPath.toString()));
+            if (LOG.isDebugEnabled()) LOG.debug("Setting split coordination node for " + hri.getRegionNameAsString());
+        } catch (KeeperException ke) {
+            LOG.error("ERROR: Zookeeper exception: " + ke);
+        }
+    }
+
+    public void clearSplit() {
+        if (LOG.isTraceEnabled()) LOG.trace("clearSplit called for region: " + this.hri.getRegionNameAsString());
+        try {
+            ZKUtil.deleteNodeRecursively(zkw, regionPath);
+        } catch (KeeperException ke) {
+            LOG.error("Zookeeper exception: " + ke);
+        }
+    }
+
+    public boolean getBalance(StringBuilder path) {
+        try {
+            byte[] balPath = ZKUtil.getData(zkw, balancePath.substring(0, balancePath.length() - 1));
+            if (balPath == null)
+                return false;
+            else {
+                path.append(new String(balPath));
+                if (LOG.isDebugEnabled()) LOG.debug("Balance information retrieved, path is: " + new String(balPath));
+                return true;
+            }
+        } catch (Exception e) {
+            if (LOG.isErrorEnabled())
+                LOG.error("Keeper exception: " + e);
+        }
+        return true;
+    }
+
+    public void setBalance() throws IOException {
+
+        try {
+            if (ZKUtil.checkExists(zkw, splitPath.substring(0, splitPath.length() - 1)) != -1) {
+                throw new IOException("SPLIT node already exists when trying to add BALANCE node");
+            }
+
+            if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length() - 1)) == -1) {
+                if (LOG.isDebugEnabled()) LOG.debug("setBalance createWithParents balancePath");
+                ZKUtil.createWithParents(zkw, balancePath.substring(0, balancePath.length() - 1));
+            }
+            ZKUtil.createSetData(zkw, balancePath.substring(0, balancePath.length() - 1), Bytes.toBytes(flushPath.toString()));
+            if (LOG.isDebugEnabled()) LOG.debug("Setting balance coordination node for " + hri.getRegionNameAsString());
+
+        } catch (KeeperException ke) {
+            LOG.error("ERROR: Zookeeper exception: " + ke);
+        }
+    }
+
+    public void clearBalance() {
+        if (LOG.isTraceEnabled()) LOG.trace("clearBalance called for region: " + this.hri.getRegionNameAsString());
+        try {
+            ZKUtil.deleteNodeRecursively(zkw, regionPath);
+        } catch (KeeperException ke) {
+            LOG.error("Zookeeper exception: " + ke);
+        }
+    }
+
+    private long getTimeStamp() {
+        return System.currentTimeMillis();
+    }
+
+    protected boolean pendingListClear(Set<TrxTransactionState> commitPendingTransactions) throws IOException {
+        if (commitPendingTransactions.isEmpty()) {
+            if (LOG.isDebugEnabled())
+                LOG.debug("pendingListClear is true because commitPendingTransactions is empty " + hri.getRegionNameAsString());
+            return true;
+        } else {
+            // Check to see if all of the TrxTransaction state objects
+            // have dropTable Recorded, in which case the pending list is
+            // considered clear of pending list.
+            for (TrxTransactionState transactionState : commitPendingTransactions) {
+                // if even one transaction state does not have drop table recorded
+                // then pendingList is not yet clear.
+                if (!transactionState.dropTableRecorded()) {
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("pendingListClear is false commitPendingTransactions is not empty "
+                                + hri.getRegionNameAsString());
+                    return false;
+                }
+            }
+            // Reaching here means pendingListClear.
+            LOG.info("pendingListClear is true because dropTableRecorded is true " + hri.getRegionNameAsString());
+            return true;
+        }
+    }
+
+    //Returning true indicates scannerList is Clear.
+    protected boolean scannersListClear(ConcurrentHashMap<Long, TransactionalRegionScannerHolder> scanners,
+    									ConcurrentHashMap<String, TrxTransactionState> transactionsById) throws IOException {
+    	  if(scanners.isEmpty()) 
+    	  {
+    	  	if (LOG.isDebugEnabled()) LOG.debug("scannersListClear Scanners is empty: " + hri.getRegionNameAsString());
+    	  	return true;
+    	  }
+    	  else
+    	  {
+    	  	if (LOG.isDebugEnabled()) LOG.debug("scannersListClear Scanners is not empty: " + hri.getRegionNameAsString());
+    	  	Iterator<Map.Entry<Long, TransactionalRegionScannerHolder>> scannerIter = scanners.entrySet().iterator();
+    	  	TransactionalRegionScannerHolder rsh = null;
+          Map.Entry<Long, TransactionalRegionScannerHolder> entry;
+    	  	while(scannerIter.hasNext())
+    	  	{
+    	  		entry = scannerIter.next();
+            rsh = entry.getValue();
+            if (rsh != null)
+            {
+            	if (LOG.isDebugEnabled()) LOG.debug("scannersListClear Active Scanner is: "+ rsh.scannerId +
+            			" Txid: "+ rsh.transId + " Region: " + hri.getRegionNameAsString());
+              String key = hri.getRegionNameAsString() + rsh.transId;
+              TrxTransactionState trxState = transactionsById.get(key);
+              
+              //if trxState is present means there is activity with this region.
+              //Hence don't return true.
+              if(trxState != null)
+              {
+             		LOG.info("scannersListClear Active Scanner found, ScannerId: " + 
+              				 rsh.scannerId + " Txid: "+ rsh.transId + " Region: " + hri.getRegionNameAsString());
+              	return false;
+          			
+              }
+            }
+    	  	}
+    	  	//Reaching here means, there is no active scanner.
+    	  	return true;
+    	  }
+    }
+
+    protected void pendingWait(Set<TrxTransactionState> commitPendingTransactions, int pendingDelayLen) throws IOException {
+        int count = 1;
+        while (!pendingListClear(commitPendingTransactions)) {
+            try {
+                if (LOG.isDebugEnabled()) LOG.debug("pendingWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+                Thread.sleep(pendingDelayLen);
+            } catch (InterruptedException e) {
+                String error = "Problem while calling sleep() on pendingWait delay, " + e;
+                if (LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
+                throw new IOException(error);
+            }
+        }
+    }
+
+    /*
+    protected void scannersWait(ConcurrentHashMap<Long, TransactionalRegionScannerHolder> scanners, int pendingDelayLen)
+            throws IOException {
+        int count = 1;
+        while (!scannersListClear(scanners)) {
+            try {
+                if (LOG.isDebugEnabled()) LOG.debug("scannersWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+                Thread.sleep(pendingDelayLen);
+            } catch (InterruptedException e) {
+                String error = "Problem while calling sleep() on scannersWait delay, " + e;
+                if (LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
+                throw new IOException(error);
+            }
+        }
+    }
+    */
+
+    protected void pendingAndScannersWait(Set<TrxTransactionState> commitPendingTransactions,
+            ConcurrentHashMap<Long, TransactionalRegionScannerHolder> scanners,
+            ConcurrentHashMap<String, TrxTransactionState> transactionsById, int pendingDelayLen) throws IOException {
+        int count = 1;
+        while (!scannersListClear(scanners, transactionsById) || !pendingListClear(commitPendingTransactions)) {
+            try {
+                if (LOG.isDebugEnabled()) LOG.debug("pendingAndScannersWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+                Thread.sleep(pendingDelayLen);
+            } catch (InterruptedException e) {
+                String error = "Problem while calling sleep() on pendingAndScannersWait delay, " + e;
+                if (LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on pendingAndScannersWait delay, returning. " + e);
+                throw new IOException(error);
+            }
+        }
+    }
+
+    protected void activeWait(ConcurrentHashMap<String, TrxTransactionState> transactionsById, int activeDelayLen,
+            int splitDelayLimit) throws IOException {
+        int counter = 0;
+        int minutes = 0;
+        int currentMin = 0;
+
+        boolean delayMsg = false;
+        while (!transactionsById.isEmpty()) {
+            try {
+                delayMsg = true;
+                Thread.sleep(activeDelayLen);
+                counter++;
+                currentMin = (counter * activeDelayLen) / 60000;
+
+                if (currentMin > minutes) {
+                    minutes = currentMin;
+                    if (LOG.isInfoEnabled()) LOG.info("Delaying split due to transactions present. Delayed : " + minutes + " minute(s) on "
+                                + hri.getRegionNameAsString());
+                }
+                if (minutes >= splitDelayLimit) {
+                    if (LOG.isWarnEnabled()) LOG.warn("Surpassed split delay limit of " + splitDelayLimit + " minutes. Continuing with split");
+                    delayMsg = false;
+                    break;
+                }
+            } catch (InterruptedException e) {
+                String error = "Problem while calling sleep() on preSplit delay - activeWait: " + e;
+                if (LOG.isErrorEnabled()) LOG.error(error);
+                throw new IOException(error);
+            }
+        }
+        if (delayMsg) {
+            if (LOG.isWarnEnabled()) LOG.warn("Continuing with split operation, no active transactions on: " + hri.getRegionNameAsString());
+        }
+    }
+
+    protected void zkCleanup() {
+        if (LOG.isTraceEnabled()) LOG.trace("zkCleanup -- ENTRY");
+        try {
+            List<String> trafTables = ZKUtil.listChildrenNoWatch(zkw, zSplitBalPathNoSlash);
+            List<String> hbaseTables = ZKUtil.listChildrenNoWatch(zkw, SplitBalanceHelper.zkTable);
+            if(trafTables != null && hbaseTables != null) {
+              for (String tableName : trafTables) {
+                if (!hbaseTables.contains(tableName)) {
+                    if (LOG.isTraceEnabled()) LOG.trace("zkCleanup, removing " + zSplitBalPath + tableName);
+                    ZKUtil.deleteNodeRecursively(zkw, zSplitBalPath + tableName);
+                }
+              }
+            }
+        } catch (KeeperException ke) {
+            if (LOG.isErrorEnabled()) LOG.error("zkCleanup error, please check your ZooKeeper: " + ke);
+        }
+        if (LOG.isTraceEnabled()) LOG.trace("zkCleanup -- EXIT");
+    }
+
+}