You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2015/10/22 01:56:49 UTC

[3/6] incubator-trafodion git commit: [TRAFODION-34]Support region splitting/balancing

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
index b3a6516..521ee14 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
@@ -50,6 +50,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -62,6 +63,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -75,6 +78,10 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
 import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -108,7 +115,9 @@ public static final String trxkeycommitPendingTransactions = "commitPendingTrans
 public static final String trxkeypendingTransactionsById = "pendingTransactionsById";
 public static final String trxkeyindoubtTransactionsCountByTmid = "indoubtTransactionsCountByTmid";
 public static final String trxkeyClosingVar = "checkClosingVariable";
+public static final String trxkeyScanners = "trxScanners";
 
+public static final String SPLIT_DELAY_NOFLUSH = "hbase.transaction.split.delay.noflush";
 public static final String SPLIT_DELAY_LIMIT = "hbase.transaction.split.delay.limit";
 public static final String EARLY_DRAIN =       "hbase.transaction.split.drain.early";
 public static final String ACTIVE_DELAY_LEN =  "hbase.transaction.split.active.delay";
@@ -133,9 +142,17 @@ private Map<Integer, Integer> indoubtTransactionsCountByTmid = new TreeMap<Integ
 // Map for Transactional Region to exchange data structures between Region Observer coprocessor and Endpoint Coprocessor
 static ConcurrentHashMap<String, Object> transactionsRefMap = new ConcurrentHashMap<String, Object>();
 
+private ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners =
+                    new ConcurrentHashMap<Long, TransactionalRegionScannerHolder>();
+
+static ConcurrentHashMap<String, Object> trxRegionMap;
+
 private ConcurrentHashMap<String, TrxTransactionState> transactionsById = new ConcurrentHashMap<String, TrxTransactionState>();
 private Set<TrxTransactionState> commitPendingTransactions = Collections.synchronizedSet(new HashSet<TrxTransactionState>());
 private AtomicBoolean closing = new AtomicBoolean(false);
+private boolean hasClosed = false;
+private boolean hasFlushed = false;
+
 
 HRegion my_Region;
 HRegionInfo regionInfo;
@@ -145,6 +162,7 @@ String hostName;
 int port;
 int splitDelayLimit;
 boolean earlyDrain;
+boolean splitDelayNoFlush;
 int activeDelayLen;
 int pendingDelayLen;
 long activeCount = 0;
@@ -159,15 +177,18 @@ int flushCount = 0;
 int regionState = 0;
 private Object recoveryCheckLock = new Object();
 private Object editReplay = new Object();
-private AtomicInteger nextSequenceId = new AtomicInteger(0);
-
-private static String zNodePath = "/hbase/Trafodion/recovery/";
+public static String zTrafPath = "/hbase/Trafodion/";
+private static String zNodePath = zTrafPath + "recovery/";
 private static ZooKeeperWatcher zkw1 = null;
 private static Object zkRecoveryCheckLock = new Object();
 
+SplitBalanceHelper sbHelper;
+
 // Region Observer Coprocessor START
 @Override
 public void start(CoprocessorEnvironment e) throws IOException {
+    trxRegionMap = TrxRegionEndpoint.getRegionMap();
+
     RegionCoprocessorEnvironment regionCoprEnv = (RegionCoprocessorEnvironment)e;
     RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e;
     my_Region = re.getRegion();
@@ -178,6 +199,7 @@ public void start(CoprocessorEnvironment e) throws IOException {
     this.activeDelayLen = conf.getInt(ACTIVE_DELAY_LEN, ACTIVETXN_DELAY_DEFAULT);
     this.pendingDelayLen = conf.getInt(PENDING_DELAY_LEN, PENDINGTXN_DELAY_DEFAULT);
     this.earlyDrain = conf.getBoolean(EARLY_DRAIN, false);
+    this.splitDelayNoFlush = conf.getBoolean(SPLIT_DELAY_NOFLUSH, true);
 
     if (LOG.isTraceEnabled()) {
         LOG.trace("Properties for -- " + regionInfo.getRegionNameAsString());
@@ -185,6 +207,7 @@ public void start(CoprocessorEnvironment e) throws IOException {
         LOG.trace("Property: activeDelayLen = " + this.activeDelayLen);
         LOG.trace("Property: pendingDelayLen = " + this.pendingDelayLen);
         LOG.trace("Property: earlyDrain = " + this.earlyDrain);
+        LOG.trace("Property: splitDelayNoFlush = " + this.splitDelayNoFlush);
     }
     
     if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: trxRegionObserver load start ");
@@ -263,7 +286,20 @@ public void start(CoprocessorEnvironment e) throws IOException {
        transactionsRefMap.put(regionName+trxkeyClosingVar, this.closing);
    }
 
-    if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: trxRegionObserver load start complete");
+   @SuppressWarnings("unchecked")
+   ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
+       (ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsRefMap
+           .get(regionName+trxkeyScanners);
+   if(scannersCheck != null) {
+     this.scanners = scannersCheck;
+   }
+   else {
+     transactionsRefMap.put(regionName+trxkeyScanners, this.scanners);
+   }
+
+   sbHelper = new SplitBalanceHelper(my_Region, zkw1);
+
+   if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery Region Observer CP: trxRegionObserver load start complete");
 
 } // end of start
 
@@ -411,6 +447,30 @@ static ConcurrentHashMap<String, Object> getRefMap() {
 @Override
 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
 
+  if(!this.splitDelayNoFlush) {
+    @SuppressWarnings("rawtypes")
+    TrxRegionEndpoint tre = (TrxRegionEndpoint)trxRegionMap.get(regionInfo.getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
+    if(tre == null) {
+        LOG.error("Unable to obtain TrxRegionEndpoint object from shared map for " + regionInfo.getRegionNameAsString());
+    }
+    else {
+  	  Path readPath = null;
+  	  StringBuilder sbPath = new StringBuilder();
+  	  if(sbHelper.getSplit(sbPath)) {
+  		  sbHelper.clearSplit();
+  	  }
+  	  else if(sbHelper.getBalance(sbPath)) {
+  	    readPath = new Path(sbPath.toString());
+  	    try {
+  		  tre.readTxnInfo(readPath);
+  	    } catch(IOException ioe) {
+  	      if (LOG.isErrorEnabled()) LOG.error("Unable to read Transactional Info for balance coordination: " + ioe);
+  	    }
+  		  sbHelper.clearBalance();
+  	  }
+    }
+  }
+
    //        Trafodion Recovery : after Open, we should have alreday constructed all the indoubt transactions in
    //        pendingTransactionsById now process it and construct transaction list by TM id. These two data
    //        structures are put in the reference map which is shared with TrxRegionEndpoint coprocessor class per region 
@@ -577,75 +637,110 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro
        }
 } // end of createRecoveryzNode
 
-    protected void pendingWait() throws IOException {
-        int count = 1;
-        while(!commitPendingTransactions.isEmpty()) {
-            try {
-                if(LOG.isDebugEnabled()) LOG.debug("pendingWait() delay, count " + count++ + " on: " + regionInfo.getRegionNameAsString());
-                Thread.sleep(this.pendingDelayLen);
-            } catch(InterruptedException e) {
-                String error = "Problem while calling sleep() on pendingWait delay, " + e;
-                if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
-                throw new IOException(error);
-            }
-        }
-    }
 
-    protected void activeWait() throws IOException {
-        int counter = 0;
-        int minutes = 0;
-        int currentMin = 0;
-
-        boolean delayMsg = false;
-        while(!transactionsById.isEmpty()) {
-            try {
-                delayMsg = true;
-                Thread.sleep(this.activeDelayLen);
-                counter++;
-                currentMin = (counter * this.activeDelayLen) / 60000;
-
-                if(currentMin > minutes) {
-                    minutes = currentMin;
-                    if (LOG.isInfoEnabled()) LOG.info("Delaying split due to transactions present. Delayed : " + 
-                                                      minutes + " minute(s) on " + regionInfo.getRegionNameAsString());
-                }
-                if(minutes >= this.splitDelayLimit) {
-                    if(LOG.isWarnEnabled()) LOG.warn("Surpassed split delay limit of " + this.splitDelayLimit
-                                                    + " minutes. Continuing with split");
-                    delayMsg = false;
-                    break;
-                }
-            } catch (InterruptedException e) {
-                String error = "Problem while calling sleep() on preSplit delay - activeWait: " + e;
-                if(LOG.isErrorEnabled()) LOG.error(error);
-                throw new IOException(error);
-            }
-        }
-        if(delayMsg) {
-          if(LOG.isWarnEnabled()) LOG.warn("Continuing with split operation, no active transactions on: " + regionInfo.getRegionNameAsString());
-        }
-    }
 
     @Override
     public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException {
         if(LOG.isTraceEnabled()) LOG.trace("preSplit -- ENTRY region: " + regionInfo.getRegionNameAsString());
 
-        if(!this.earlyDrain) {
-          activeWait();
+        if(splitDelayNoFlush) {
+            if(!this.earlyDrain)
+              sbHelper.activeWait(transactionsById, activeDelayLen, splitDelayLimit);
+            closing.set(true);
+            sbHelper.pendingWait(commitPendingTransactions, pendingDelayLen);
+        }
+        else {
+            sbHelper.pendingAndScannersWait(commitPendingTransactions, scanners, pendingDelayLen);
+            closing.set(true);
+
+            sbHelper.setSplit();
         }
-        closing.set(true);
-        pendingWait();
 
         if(LOG.isTraceEnabled()) LOG.trace("preSplit -- EXIT region: " + regionInfo.getRegionNameAsString());
     }
 
     @Override
+    public void	postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) {
+
+        if(splitDelayNoFlush)
+          return;
+
+        @SuppressWarnings("rawtypes")
+        TrxRegionEndpoint treL = (TrxRegionEndpoint)trxRegionMap.get(l.getRegionInfo().getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
+          @SuppressWarnings("rawtypes")
+          TrxRegionEndpoint treR = (TrxRegionEndpoint)trxRegionMap.get(r.getRegionInfo().getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
+          if(treL == null || treR == null) {
+             LOG.error("Unable to obtain TrxRegionEndpoint object from shared map for " + regionInfo.getRegionNameAsString());
+          }
+          else {
+          try {
+             treL.setClosing(true);
+             treR.setClosing(true);
+             Thread readThread = new Thread(new TxnReadThread(treL, sbHelper.getPath(), true));
+             readThread.start();
+             //treL.readTxnInfo(sbHelper.getPath(), true);
+             treR.readTxnInfo(sbHelper.getPath(), true);
+             readThread.join();
+             treL.setClosing(false);
+             treR.setClosing(false);
+             sbHelper.clearSplit();
+          } catch (IOException ioe) {
+             if(LOG.isErrorEnabled()) LOG.error("Unable to read Transaction Info for transactional split coordination: " + ioe);
+          } catch (InterruptedException ie) {
+             if(LOG.isErrorEnabled()) LOG.error("Thread issue hit while trying to read Transaction Info for transactional split coordination: " + ie);
+          }
+       }
+    }
+
+    @Override
     public void    preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) {
-        if(LOG.isInfoEnabled()) {
-            HRegion region = c.getEnvironment().getRegion();
-            LOG.info("preClose -- setting close var to true on: " + region.getRegionNameAsString());
+        if (hasFlushed ||
+            hasClosed ||
+            splitDelayNoFlush ||
+            c.getEnvironment().getRegionServerServices().isStopping() ||
+            c.getEnvironment().getRegionServerServices().isStopped())
+            return;
+
+        if (!hasClosed) {
+	        if(LOG.isInfoEnabled()) {
+	            HRegion region = c.getEnvironment().getRegion();
+	            LOG.debug("preClose -- setting close var to true on: " + region.getRegionNameAsString());
+	        }
+	        try {
+	          sbHelper.pendingAndScannersWait(commitPendingTransactions, scanners, pendingDelayLen);
+	        } catch(IOException ioe) {
+	          LOG.error("Encountered exception when calling pendingAndScannersWait(): " + ioe);
+	        }
+	        closing.set(true);
+	        hasClosed = true;
+        }
+
+        if (c.getEnvironment().getRegionServerServices().isStopping() ||
+            c.getEnvironment().getRegionServerServices().isStopped() ||
+            hasFlushed)
+            return;
+
+        @SuppressWarnings("rawtypes")
+        TrxRegionEndpoint tre = (TrxRegionEndpoint)trxRegionMap.get(regionInfo.getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
+        if(tre == null) {
+            LOG.error("Unable to obtain TrxRegionEndpoint objet from shared map for " + regionInfo.getRegionNameAsString());
+        }
+        else {
+          try {
+            tre.flushToFS(sbHelper.getPath());
+            if(!sbHelper.getSplit()) {
+              try {
+               sbHelper.setBalance();
+              } catch (IOException ioe) {
+               if(LOG.isErrorEnabled()) LOG.error("Unable to set balance: " + ioe);
+              }
+            }
+            hasFlushed = true;
+          } catch (IOException ioe) {
+            if (LOG.isErrorEnabled()) LOG.error("Unable to flush to filesystem");
+            hasFlushed = false;
+          }
         }
-        closing.set(true);
     }
 
     @Override
@@ -656,6 +751,7 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro
         transactionsRefMap.remove(regionName+trxkeytransactionsById);
         transactionsRefMap.remove(regionName+trxkeycommitPendingTransactions);
         transactionsRefMap.remove(regionName+trxkeyClosingVar);
+        transactionsRefMap.remove(regionName+trxkeyScanners);
     }
 
     protected InternalScanner getWrappedScanner(final long lowStartId, final InternalScanner s) {
@@ -787,4 +883,22 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro
         return getWrappedScanner(lowStartId , scanner); 
     }
 
+    private static class TxnReadThread implements Runnable {
+      @SuppressWarnings("rawtypes")
+      private TrxRegionEndpoint tre;
+      private Path path;
+      private boolean isSplit;
+      TxnReadThread(@SuppressWarnings("rawtypes") TrxRegionEndpoint tre, Path path, boolean isSplit) {
+        this.tre = tre;
+        this.path = path;
+        this.isSplit = isSplit;
+      }
+      public void run(){
+        try {
+          tre.readTxnInfo(path, isSplit);
+        } catch (IOException ioe) {
+          if(LOG.isErrorEnabled()) LOG.error("Unable to read Transaction Info for transactional split coordination: " + ioe);
+        }
+      }
+    }
 } // end of TrxRegionObserver Class