You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by sa...@apache.org on 2017/05/12 20:25:42 UTC
[07/22] incubator-trafodion git commit: This is a large contribution
of changes from Esgyn TransactionManager and libraries that are collectively
much better tested and hardened than Trafodion,
but are too numerous and complex to cherry pick individually
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
index 403df5f..78ed97e 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
@@ -28,6 +28,7 @@ import java.io.IOException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
@@ -56,6 +57,7 @@ import java.util.ListIterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.Set;
@@ -93,6 +95,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ScheduledChore;
@@ -107,6 +110,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerTimeoutException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.transactional.MemoryUsageException;
+import org.apache.hadoop.hbase.client.transactional.NonPendingTransactionException;
import org.apache.hadoop.hbase.client.transactional.OutOfOrderProtocolException;
import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
import org.apache.hadoop.hbase.client.transactional.BatchException;
@@ -116,6 +120,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+#ifdef HDP2.3 HDP2.4 CDH5.7 APACHE1.2
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.ScheduledChore;
+#endif
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -142,18 +150,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.LeaseException;
import org.apache.hadoop.hbase.regionserver.LeaseListener;
import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
//import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+#ifdef CDH5.7 APACHE1.2
+import org.apache.hadoop.hbase.regionserver.Region;
+#endif
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
@@ -235,6 +246,10 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
+#ifdef CDH5.7 APACHE1.2
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+#endif
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
@@ -307,8 +322,6 @@ CoprocessorService, Coprocessor {
private final Boolean splitDelayEnabled = false;
private final Boolean doWALHlog = false;
static Leases transactionLeases = null;
- // Joanie: commenting out scanner leases for now
- //static Leases scannerLeases = null;
CleanOldTransactionsChore cleanOldTransactionsThread;
static MemoryUsageChore memoryUsageThread = null;
Stoppable stoppable = new StoppableImplementation();
@@ -318,7 +331,7 @@ CoprocessorService, Coprocessor {
private int regionState = 0;
private Path recoveryTrxPath = null;
private int cleanAT = 0;
- private long onlineEpoch = EnvironmentEdgeManager.currentTime();
+ private long onlineEpoch = EnvironmentEdgeManager.currentTime();
private long[] commitCheckTimes = new long[50];
private long[] hasConflictTimes = new long[50];
@@ -347,8 +360,13 @@ CoprocessorService, Coprocessor {
private double avgWriteToLogTime = 0;
private HRegionInfo regionInfo = null;
+#ifdef CDH5.7 APACHE1.2
+ private Region m_Region = null;
+#else
private HRegion m_Region = null;
+#endif
private String m_regionName = null;
+ private String m_regionDetails = null;
private boolean m_isTrafodionMetadata = false;
#ifdef CDH5.7 APACHE1.2
private HRegion t_Region = null;
@@ -376,7 +394,7 @@ CoprocessorService, Coprocessor {
private static final int MINIMUM_LEASE_TIME = 7200 * 1000;
private static final int LEASE_CHECK_FREQUENCY = 1000;
- private static final int DEFAULT_SLEEP = 60 * 1000;
+ private static final int DEFAULT_SLEEP = 30 * 1000; // 30 seconds
private static final int DEFAULT_MEMORY_THRESHOLD = 100; // 100% memory used
private static final int DEFAULT_STARTUP_MEMORY_THRESHOLD = 90; // initial value : 90% memory used
private static final int DEFAULT_MEMORY_SLEEP = 15 * 1000;
@@ -386,6 +404,7 @@ CoprocessorService, Coprocessor {
private static final boolean DEFAULT_SKIP_WAL = false;
private static final boolean DEFAULT_COMMIT_EDIT = false;
private static final boolean DEFAULT_SUPPRESS_OOP = false;
+ private static final boolean DEFAULT_TM_USE_COMMIT_ID_IN_CELLS = false;
private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
private static final String LEASE_CONF = "hbase.transaction.lease.timeout";
private static final String MEMORY_THRESHOLD = "hbase.transaction.memory.threshold";
@@ -397,6 +416,7 @@ CoprocessorService, Coprocessor {
private static final String CONF_COMMIT_EDIT = "hbase.trafodion.full.commit.edit";
private static final String SUPPRESS_OOP = "hbase.transaction.suppress.OOP.exception";
private static final String CHECK_ROW = "hbase.transaction.check.row";
+ private static final String CONF_TM_USE_COMMIT_ID_IN_CELLS = "hbase.transaction.use.commitId";
protected static int transactionLeaseTimeout = 0;
private static int scannerLeaseTimeoutPeriod = 0;
private static int scannerThreadWakeFrequency = 0;
@@ -406,6 +426,7 @@ CoprocessorService, Coprocessor {
private static int asyncWal = DEFAULT_ASYNC_WAL;
private static boolean skipWal = DEFAULT_SKIP_WAL;
private static boolean fullEditInCommit = DEFAULT_COMMIT_EDIT;
+ private static boolean useCommitIdInCells = DEFAULT_TM_USE_COMMIT_ID_IN_CELLS;
private static MemoryMXBean memoryBean = null;
private static float memoryPercentage = 0;
private static boolean memoryThrottle = false;
@@ -449,6 +470,7 @@ CoprocessorService, Coprocessor {
// TBD Maybe we should just use HashMap to improve the performance, ConcurrentHashMap could be too strict
static ConcurrentHashMap<String, Object> transactionsEPCPMap;
+ long choreCount = 1;
// TrxRegionService methods
@Override
@@ -459,44 +481,31 @@ CoprocessorService, Coprocessor {
long transactionId = request.getTransactionId();
boolean dropTableRecorded = request.getDropTableRecorded();
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: abortTransaction - txId " + transactionId +
- ", dropTableRecoded " + dropTableRecorded + ", regionName " + regionInfo.getRegionNameAsString());
+ boolean ignoreUnknownTransaction = request.getIgnoreUnknownTransactionException();
+ if (LOG.isTraceEnabled()) LOG.trace("abortTransaction - txId " + transactionId +
+ ", dropTableRecoded " + dropTableRecorded + ", regionName " + m_regionDetails);
IOException ioe = null;
UnknownTransactionException ute = null;
- WrongRegionException wre = null;
Throwable t = null;
- /* commenting out for the time being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:abortTransaction threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- } else
- */
- {
// Process in local memory
+ int participant = request.getParticipantNum();
try {
- abortTransaction(transactionId, dropTableRecorded);
+ abortTransaction(transactionId, dropTableRecorded, ignoreUnknownTransaction);
} catch (UnknownTransactionException u) {
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId "
+ LOG.error("TrxRegionEndpoint coprocessor:abort - txId "
+ transactionId
+ + " participant " + participant
+ ", Caught UnknownTransactionException after internal abortTransaction call - "
+ u.getMessage() + " "
+ stackTraceToString(u));
ute = u;
} catch (IOException e) {
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught IOException after internal abortTransaction call - " + e.getMessage() + " " + stackTraceToString(e));
+ LOG.error("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + " participant " + participant
+ + ", Caught IOException after internal abortTransaction call - ", e);
ioe = e;
}
- }
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionResponse.Builder abortTransactionResponseBuilder = AbortTransactionResponse.newBuilder();
@@ -508,12 +517,6 @@ CoprocessorService, Coprocessor {
abortTransactionResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- abortTransactionResponseBuilder.setHasException(true);
- abortTransactionResponseBuilder.setException(wre.toString());
- }
-
if (ioe != null)
{
abortTransactionResponseBuilder.setHasException(true);
@@ -543,20 +546,20 @@ CoprocessorService, Coprocessor {
String requestRegionName;
IOException ioe = null;
UnknownTransactionException ute = null;
- WrongRegionException wre = null;
Throwable t = null;
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: abortMultiple - txId " + transactionId + ", master regionName " + regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: abortMultiple - txId " + transactionId + " number of region is commitMultiple " + numOfRegion);
+ if (LOG.isTraceEnabled()) LOG.trace("abortMultiple - txId " + transactionId + " number of region is commitMultiple "
+ + numOfRegion + ", master regionName " + m_regionName);
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse.Builder abortTransactionMultipleResponseBuilder = AbortTransactionMultipleResponse.newBuilder();
abortTransactionMultipleResponseBuilder.setHasException(false);
+ int participant = 0;
while (i < numOfRegion) {
requestRegionName = request.getRegionName(i).toStringUtf8();
abortTransactionMultipleResponseBuilder.addException(BatchException.EXCEPTION_OK.toString());
-
try {
+ participant = request.getParticipantNum();
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint abortMultiple begins for region " + requestRegionName);
TrxRegionEndpoint regionEPCP = (TrxRegionEndpoint) transactionsEPCPMap.get(requestRegionName+trxkeyEPCPinstance);
if (regionEPCP == null) {
@@ -570,10 +573,11 @@ CoprocessorService, Coprocessor {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint abortMultiple ends");
// abortTransaction(transactionId);
} catch (UnknownTransactionException u) {
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught UnknownTransactionException after internal abortTransaction call - " + u.getMessage() + " " + stackTraceToString(u));
+ LOG.error("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + " participant " + participant + ", Caught UnknownTransactionException after internal abortTransaction call - " + u.getMessage() + " " + stackTraceToString(u));
ute = u;
} catch (IOException e) {
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught IOException after internal abortTransaction call - " + e.getMessage() + " " + stackTraceToString(e));
+ LOG.error("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + " participant " + participant
+ + ", Caught IOException after internal abortTransaction call - ", e);
ioe = e;
}
@@ -583,12 +587,6 @@ CoprocessorService, Coprocessor {
abortTransactionMultipleResponseBuilder.setException(i, t.toString());
}
- if (wre != null)
- {
- abortTransactionMultipleResponseBuilder.setHasException(true);
- abortTransactionMultipleResponseBuilder.setException(i, wre.toString());
- }
-
if (ioe != null)
{
abortTransactionMultipleResponseBuilder.setHasException(true);
@@ -618,39 +616,26 @@ CoprocessorService, Coprocessor {
Throwable t = null;
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
- WrongRegionException wre = null;
MemoryUsageException mue = null;
long transactionId = request.getTransactionId();
+ long startId = request.getStartId();
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString());
-
- // First test if this region matches our region name
+ if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - txId " + transactionId + ", regionName " + m_regionDetails);
- /* commenting it out for the time-being
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:beginTransaction threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- }else
- */
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: beginTransaction - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("beginTransaction - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("beginTransaction memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
else
{
try {
- beginTransaction(transactionId);
+ beginTransaction(transactionId, startId);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("beginTransaction - txId "
+ transactionId + ", Caught exception ", e);
@@ -669,15 +654,9 @@ CoprocessorService, Coprocessor {
beginTransactionResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- beginTransactionResponseBuilder.setHasException(true);
- beginTransactionResponseBuilder.setException(wre.toString());
- }
-
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("beginTransaction - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
beginTransactionResponseBuilder.setHasException(true);
beginTransactionResponseBuilder.setException(mue.toString());
}
@@ -694,16 +673,17 @@ CoprocessorService, Coprocessor {
CommitResponse response = CommitResponse.getDefaultInstance();
Throwable t = null;
- WrongRegionException wre = null;
long transactionId = request.getTransactionId();
+ long commitId = request.getCommitId();
final int participantNum = request.getParticipantNum();
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commit - txId "
- + transactionId + ", participantNum " + participantNum + ", regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isDebugEnabled()) LOG.debug("commit - txId "
+ + transactionId + ", commitId " + commitId + ", participantNum " + participantNum
+ + ", region " + m_regionDetails);
// Process local memory
try {
- commit(transactionId, participantNum, request.getIgnoreUnknownTransactionException());
+ commit(transactionId, commitId, participantNum, request.getIgnoreUnknownTransactionException());
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("commit - txId " + transactionId
+ ", Caught exception ", e);
@@ -720,12 +700,6 @@ CoprocessorService, Coprocessor {
commitResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- commitResponseBuilder.setHasException(true);
- commitResponseBuilder.setException(wre.toString());
- }
-
CommitResponse cresponse = commitResponseBuilder.build();
done.run(cresponse);
@@ -738,14 +712,14 @@ CoprocessorService, Coprocessor {
CommitMultipleResponse response = CommitMultipleResponse.getDefaultInstance();
Throwable t = null;
- WrongRegionException wre = null;
long transactionId = request.getTransactionId();
+ long commitId = request.getCommitId();
int i = 0;
int numOfRegion = request.getRegionNameCount();
String requestRegionName;
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitMultiple - txId " + transactionId + " master regionName " + regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitMultiple - txId " + transactionId + " number of region is commitMultiple " + numOfRegion);
+ if (LOG.isTraceEnabled()) LOG.trace("commitMultiple - txId " + transactionId + " master regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("commitMultiple - txId " + transactionId + " number of region is commitMultiple " + numOfRegion);
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse.Builder commitMultipleResponseBuilder = CommitMultipleResponse.newBuilder();
commitMultipleResponseBuilder.setHasException(false);
@@ -763,7 +737,7 @@ CoprocessorService, Coprocessor {
commitMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
- regionEPCP.commit(transactionId, i, request.getIgnoreUnknownTransactionException());
+ regionEPCP.commit(transactionId, commitId, i, request.getIgnoreUnknownTransactionException());
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitMultiple ends");
//commit(transactionId, request.getIgnoreUnknownTransactionException());
@@ -779,12 +753,6 @@ CoprocessorService, Coprocessor {
commitMultipleResponseBuilder.setException(i, t.toString());
}
- if (wre != null)
- {
- commitMultipleResponseBuilder.setHasException(true);
- commitMultipleResponseBuilder.setException(i, wre.toString());
- }
-
i++; // move to next region
} // end of while-loop on all the regions in thecommitMultiple request
@@ -806,11 +774,10 @@ CoprocessorService, Coprocessor {
long startEpoch = request.getStartEpoch();
final int participantNum = request.getParticipantNum();
Throwable t = null;
- WrongRegionException wre = null;
// Process local memory
try {
- if (LOG.isDebugEnabled()) LOG.debug("commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible");
+ if (LOG.isDebugEnabled()) LOG.debug("commitIfPossible - txId " + transactionId + ", regionName, " + m_regionDetails + "calling internal commitIfPossible");
reply = commitIfPossible(transactionId, startEpoch, commitId, participantNum);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("commitIfPossible - txId " + transactionId
@@ -828,12 +795,6 @@ CoprocessorService, Coprocessor {
commitIfPossibleResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- commitIfPossibleResponseBuilder.setHasException(true);
- commitIfPossibleResponseBuilder.setException(wre.toString());
- }
-
CommitIfPossibleResponse cresponse = commitIfPossibleResponseBuilder.build();
done.run(cresponse);
}
@@ -850,7 +811,6 @@ CoprocessorService, Coprocessor {
UnknownTransactionException ute = null;
CommitConflictException cce = null;
Throwable t = null;
- WrongRegionException wre = null;
long transactionId = request.getTransactionId();
long startEpoch = request.getStartEpoch();
int participantNum = request.getParticipantNum();
@@ -858,13 +818,13 @@ CoprocessorService, Coprocessor {
if (LOG.isTraceEnabled()) LOG.trace("commitRequest - txId "
+ transactionId + ", startEpoch " + startEpoch + ", participantNum " + participantNum + ", dropTableRecorded " + dropTableRecorded +
- ", regionName " + regionInfo.getRegionNameAsString());
+ ", regionName " + m_regionDetails);
// Process local memory
try {
status = commitRequest(transactionId, startEpoch, participantNum, dropTableRecorded);
} catch (UnknownTransactionException u) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
+ LOG.info("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call in region " + m_regionDetails, u);
ute = u;
status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
} catch (CommitConflictException c) {
@@ -872,7 +832,8 @@ CoprocessorService, Coprocessor {
cce = c;
status = COMMIT_CONFLICT;
} catch (IOException e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught IOException after internal commitRequest call - "+ e.toString());
+ LOG.error("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + " participant " + participantNum
+ +" , Caught IOException after internal commitRequest call - ", e);
ioe = e;
status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
}
@@ -893,12 +854,6 @@ CoprocessorService, Coprocessor {
commitRequestResponseBuilder.setException(cce.toString());
}
- if (wre != null)
- {
- commitRequestResponseBuilder.setHasException(true);
- commitRequestResponseBuilder.setException(wre.toString());
- }
-
if (ioe != null)
{
commitRequestResponseBuilder.setHasException(true);
@@ -928,7 +883,6 @@ CoprocessorService, Coprocessor {
IOException ioe = null;
UnknownTransactionException ute = null;
Throwable t = null;
- WrongRegionException wre = null;
long transactionId = request.getTransactionId();
long startEpoch = request.getStartEpoch();
int i = 0;
@@ -945,20 +899,20 @@ CoprocessorService, Coprocessor {
requestRegionName = request.getRegionName(i).toStringUtf8();
/*
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + Hex.encodeHexString(request.getRegionName(i).toStringUtf8().getBytes()));
- if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + this.m_Region.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + regionInfo.getRegionNameAsString());
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + this.m_Region.getRegionName().toString());
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA0 Region Key " + Hex.encodeHexString(ByteString.copyFrom(this.m_Region.getRegionName()).toString().getBytes()));
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA1 Region Key " + Hex.encodeHexString(requestRegionName.getBytes()));
- if (LOG.isTraceEnabled()) LOG.trace("EPCP AA2 Region Key " + Hex.encodeHexString(this.m_Region.getRegionNameAsString().getBytes()));
+ if (LOG.isTraceEnabled()) LOG.trace("EPCP AA2 Region Key " + Hex.encodeHexString(regionInfo.getRegionNameAsString().getBytes()));
if (LOG.isTraceEnabled()) LOG.trace("EPCP AA2 Region Key " + Hex.encodeHexString(this.m_Region.getRegionName()));
if (requestRegionName.equals(ByteString.copyFrom(this.m_Region.getRegionName()).toString())) {
if (LOG.isTraceEnabled()) { LOG.trace("EPCP BB0 Region Key matches !! " + request.getRegionName(i).toString()); }
}
- if (Arrays.equals(request.getRegionName(i).toStringUtf8().getBytes(), this.m_Region.getRegionNameAsString().getBytes())) {
+ if (Arrays.equals(request.getRegionName(i).toStringUtf8().getBytes(), regionInfo.getRegionNameAsString().getBytes())) {
if (LOG.isTraceEnabled()) { LOG.trace("EPCP BB1 Region Key matches !! " + request.getRegionName(i).toString()); }
}
- if (request.getRegionName(i).toStringUtf8().equals(this.m_Region.getRegionNameAsString())) {
+ if (request.getRegionName(i).toStringUtf8().equals(regionInfo.getRegionNameAsString())) {
if (LOG.isTraceEnabled()) { LOG.trace("EPCP BB2 Region Key matches !! " + request.getRegionName(i).toString()); }
}
*/
@@ -979,10 +933,11 @@ CoprocessorService, Coprocessor {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple ends");
//status = commitRequest(transactionId);
} catch (UnknownTransactionException u) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequestMultiple - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
+ if (LOG.isTraceEnabled()) LOG.trace("commitRequestMultiple - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
ute = u;
} catch (IOException e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequestMultiple - txId " + transactionId + ", Caught IOException after internal commitRequest call - "+ e.toString());
+ LOG.error("commitRequestMultiple - txId " + transactionId
+ + ", Caught IOException after internal commitRequest call - ", e);
ioe = e;
}
@@ -992,12 +947,6 @@ CoprocessorService, Coprocessor {
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_SKIPREMAININGREGIONS_OK.toString());
}
- if (wre != null)
- {
- commitRequestMultipleResponseBuilder.setHasException(true);
- commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_SKIPREMAININGREGIONS_OK.toString());
- }
-
if (ioe != null)
{
commitRequestMultipleResponseBuilder.setHasException(true);
@@ -1033,9 +982,9 @@ CoprocessorService, Coprocessor {
Delete delete = null;
Throwable t = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
boolean result = false;
long tid = request.getTid();
+ long commitId = request.getCommitId();
boolean autoCommit = request.getAutoCommit();
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
@@ -1056,7 +1005,6 @@ CoprocessorService, Coprocessor {
}
if (mue == null &&
- wre == null &&
type == MutationType.DELETE &&
proto.hasRow()){
try {
@@ -1079,7 +1027,8 @@ CoprocessorService, Coprocessor {
if (t == null) {
try {
- result = checkAndDeleteRegionTx(tid,
+ result = checkAndDeleteRegionTx(tid, tid,
+ commitId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
@@ -1110,11 +1059,6 @@ CoprocessorService, Coprocessor {
checkAndDeleteRegionTxResponseBuilder.setException(t.toString());
}
- if (wre != null){
- checkAndDeleteRegionTxResponseBuilder.setHasException(true);
- checkAndDeleteRegionTxResponseBuilder.setException(wre.toString());
- }
-
if (mue != null){
if (LOG.isWarnEnabled()) LOG.warn("checkAndDeleteRegionTx - performing memoryPercentage " + memoryPercentage
+ ", posting memory usage exceeds indicated percentage");
@@ -1140,41 +1084,28 @@ CoprocessorService, Coprocessor {
Delete delete = null;
Throwable t = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
boolean result = false;
long transactionId = request.getTransactionId();
+ long startId = request.getStartId();
+
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
// First test if this region matches our region name
- /* commenting it out for the time-being
-
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:checkAndDelete threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- }
- */
-
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteResponse.Builder checkAndDeleteResponseBuilder = CheckAndDeleteResponse.newBuilder();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: checkAndDelete - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("checkAndDelete - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndDelete - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("checkAndDelete memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
if (mue == null &&
- wre == null &&
type == MutationType.DELETE &&
proto.hasRow())
{
@@ -1200,6 +1131,7 @@ CoprocessorService, Coprocessor {
try {
result = checkAndDelete(transactionId,
+ startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
@@ -1221,7 +1153,7 @@ CoprocessorService, Coprocessor {
checkAndDeleteResponseBuilder.setResult(result);
}
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndDelete - txId " + transactionId + ", result is " + result);
+ if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete - txId " + transactionId + ", result is " + result);
checkAndDeleteResponseBuilder.setHasException(false);
@@ -1231,15 +1163,9 @@ CoprocessorService, Coprocessor {
checkAndDeleteResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- checkAndDeleteResponseBuilder.setHasException(true);
- checkAndDeleteResponseBuilder.setException(wre.toString());
- }
-
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndDelete - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("checkAndDelete - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
checkAndDeleteResponseBuilder.setHasException(true);
checkAndDeleteResponseBuilder.setException(mue.toString());
}
@@ -1265,40 +1191,24 @@ CoprocessorService, Coprocessor {
MutationType type = proto.getMutateType();
Put put = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
Throwable t = null;
boolean result = false;
long transactionId = request.getTransactionId();
-
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:checkAndPut threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- }
- */
+ long startId = request.getStartId();
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutResponse.Builder checkAndPutResponseBuilder = CheckAndPutResponse.newBuilder();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: checkAndPut - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("checkAndPut - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
mue = new MemoryUsageException("checkAndPut memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndPut - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
+ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
}
}
if (mue == null &&
- wre == null &&
type == MutationType.PUT &&
proto.hasRow())
{
@@ -1335,6 +1245,7 @@ CoprocessorService, Coprocessor {
try {
result = checkAndPut(transactionId,
+ startId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
@@ -1356,16 +1267,10 @@ CoprocessorService, Coprocessor {
checkAndPutResponseBuilder.setResult(result);
}
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndPut - txId " + transactionId + ", result is " + result);
+ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - txId " + transactionId + ", result is " + result);
checkAndPutResponseBuilder.setHasException(false);
- if (wre != null)
- {
- checkAndPutResponseBuilder.setHasException(true);
- checkAndPutResponseBuilder.setException(wre.toString());
- }
-
if (t != null)
{
checkAndPutResponseBuilder.setHasException(true);
@@ -1374,7 +1279,7 @@ CoprocessorService, Coprocessor {
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: checkAndPut - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
+ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
checkAndPutResponseBuilder.setHasException(true);
checkAndPutResponseBuilder.setException(mue.toString());
}
@@ -1397,14 +1302,15 @@ CoprocessorService, Coprocessor {
com.google.protobuf.ByteString qualifier = null;
com.google.protobuf.ByteString value = null;
long tid = request.getTid();
+ long commitId = request.getCommitId();
MutationProto proto = request.getPut();
MutationType type = proto.getMutateType();
boolean autoCommit = request.getAutoCommit();
Put put = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
Throwable t = null;
boolean result = false;
+ long startId = tid;
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRegionTxResponse.Builder checkAndPutRegionTxResponseBuilder = CheckAndPutRegionTxResponse.newBuilder();
@@ -1419,7 +1325,6 @@ CoprocessorService, Coprocessor {
}
if (mue == null &&
- wre == null &&
type == MutationType.PUT &&
proto.hasRow())
{
@@ -1456,6 +1361,7 @@ CoprocessorService, Coprocessor {
try {
result = checkAndPutRegionTx(tid,
+ startId, commitId,
request.getRow().toByteArray(),
request.getFamily().toByteArray(),
request.getQualifier().toByteArray(),
@@ -1482,12 +1388,6 @@ CoprocessorService, Coprocessor {
checkAndPutRegionTxResponseBuilder.setHasException(false);
- if (wre != null)
- {
- checkAndPutRegionTxResponseBuilder.setHasException(true);
- checkAndPutRegionTxResponseBuilder.setException(wre.toString());
- }
-
if (t != null)
{
checkAndPutRegionTxResponseBuilder.setHasException(true);
@@ -1514,27 +1414,11 @@ CoprocessorService, Coprocessor {
RegionScanner scanner = null;
Throwable t = null;
OutOfOrderProtocolException oop = null;
- WrongRegionException wre = null;
Exception ce = null;
long transId = request.getTransactionId();
long scannerId = request.getScannerId();
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner - txId " + transId + ", scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString());
-
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- } else {
- */
+ if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", scanner id " + scannerId + ", regionName " + m_regionDetails);
// There should be a matching key in the transactionsById map
// associated with this transaction id. If there is not
@@ -1547,16 +1431,15 @@ CoprocessorService, Coprocessor {
if (keyFound != true)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner - Unknown transaction [" + transId
- + "] in region ["
- + m_Region.getRegionInfo().getRegionNameAsString()
+ if (LOG.isTraceEnabled()) LOG.trace("closeScanner - Unknown transaction [" + transId
+ + "] in region [" + m_regionDetails
+ "], will create an OutOfOrderProtocol exception ");
oop = new OutOfOrderProtocolException("closeScanner does not have an active transaction with an open scanner, txId: " + transId);
}
if (oop == null) {
try {
- // we want to allow closing scaners and remove operations up until the very end.
+ // we want to allow closing scanners and remove operations up until the very end.
checkBlockAll(transId);
scanner = removeScanner(scannerId);
@@ -1564,22 +1447,22 @@ CoprocessorService, Coprocessor {
scanner.close();
}
else
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner - txId " + transId + ", scanner was null for scanner id " + scannerId);
+ if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", scanner was null for scanner id " + scannerId);
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
// ignore
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner failed to get a lease " + scannerId);
+ if (LOG.isTraceEnabled()) LOG.trace("closeScanner failed to get a lease " + scannerId);
}
*/
} catch(Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner - txId " + transId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
+ if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
ce = e;
} catch(Throwable e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: closeScanner - txId " + transId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
+ if (LOG.isTraceEnabled()) LOG.trace("closeScanner - txId " + transId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
}
@@ -1594,12 +1477,6 @@ CoprocessorService, Coprocessor {
closeResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- closeResponseBuilder.setHasException(true);
- closeResponseBuilder.setException(wre.toString());
- }
-
if (ce != null)
{
closeResponseBuilder.setHasException(true);
@@ -1612,10 +1489,10 @@ CoprocessorService, Coprocessor {
{
closeResponseBuilder.setHasException(true);
closeResponseBuilder.setException(oop.toString());
- LOG.warn("TrxRegionEndpoint coprocessor: closeScanner - OutOfOrderProtocolException, transaction was not found, txId: " + transId + ",returned exception" + ", regionName " + regionInfo.getRegionNameAsString());
+ LOG.warn("closeScanner - OutOfOrderProtocolException, transaction was not found, txId: " + transId + ",returned exception" + ", region " + m_regionDetails);
}
else
- LOG.warn("TrxRegionEndpoint coprocessor: closeScanner - suppressing OutOfOrderProtocolException, transaction was not found, txId: " + transId + ", regionName " + regionInfo.getRegionNameAsString());
+ LOG.warn("closeScanner - suppressing OutOfOrderProtocolException, transaction was not found, txId: " + transId + ", region " + m_regionDetails);
}
CloseScannerResponse cresponse = closeResponseBuilder.build();
@@ -1636,34 +1513,20 @@ CoprocessorService, Coprocessor {
MutationType type;
Throwable t = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
long transactionId = request.getTransactionId();
+ long startId = request.getStartId();
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:deleteMultiple threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- }
- */
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: deleteMultiple - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("deleteMultiple - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteMultiple - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
+ if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
mue = new MemoryUsageException("deleteMultiple memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
- if (mue == null && wre == null) {
+ if (mue == null) {
for (org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto proto : results)
{
delete = null;
@@ -1687,19 +1550,19 @@ CoprocessorService, Coprocessor {
if ((delete != null) && (t == null))
{
try {
- delete(transactionId, delete);
+ delete(transactionId, startId, delete);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("deleteMultiple - txId " + transactionId
+ ", Caught exception ", e);
t = e;
}
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteMultiple - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
+ if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - txId " + transactionId + ", region " + m_regionDetails + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
}
}
else
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteMultiple - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString() + ", delete proto was null");
+ if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - txId " + transactionId + ", region " + m_regionDetails + ", delete proto was null");
}
}
@@ -1714,15 +1577,9 @@ CoprocessorService, Coprocessor {
deleteMultipleTransactionalResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- deleteMultipleTransactionalResponseBuilder.setHasException(true);
- deleteMultipleTransactionalResponseBuilder.setException(wre.toString());
- }
-
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteMultiple - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("deleteMultiple - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
deleteMultipleTransactionalResponseBuilder.setHasException(true);
deleteMultipleTransactionalResponseBuilder.setException(mue.toString());
}
@@ -1744,8 +1601,8 @@ CoprocessorService, Coprocessor {
Boolean autoCommit = request.getAutoCommit();
Throwable t = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
long tid = request.getTid();
+ long commitId = request.getCommitId();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
@@ -1770,7 +1627,7 @@ CoprocessorService, Coprocessor {
// Process in local memory
if ((delete != null) && (t == null)){
try {
- deleteRegionTx(tid, delete, autoCommit);
+ deleteRegionTx(tid, tid, commitId, delete, autoCommit);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("deleteRegionTx - tid " + tid
+ ", Caught exception after internal deleteRegionTx - ", e);
@@ -1778,7 +1635,7 @@ CoprocessorService, Coprocessor {
}
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - tid " + tid + ", regionName "
- + regionInfo.getRegionNameAsString() + ", type " + type + ", row "
+ + m_regionDetails + ", type " + type + ", row "
+ Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex "
+ Hex.encodeHexString(proto.getRow().toByteArray()));
}
@@ -1798,12 +1655,6 @@ CoprocessorService, Coprocessor {
deleteRegionTxResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- deleteRegionTxResponseBuilder.setHasException(true);
- deleteRegionTxResponseBuilder.setException(wre.toString());
- }
-
if (mue != null)
{
if (LOG.isTraceEnabled()) LOG.trace("deleteRegionTx - performing memoryPercentage "
@@ -1828,30 +1679,15 @@ CoprocessorService, Coprocessor {
Delete delete = null;
Throwable t = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
long transactionId = request.getTransactionId();
-
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:delete threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- }
- */
+ long startId = request.getStartId();
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: delete - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("delete - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("delete - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("delete memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
}
}
@@ -1870,14 +1706,14 @@ CoprocessorService, Coprocessor {
if ((delete != null) && (t == null))
{
try {
- delete(transactionId, delete);
+ delete(transactionId, startId, delete);
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("delete - txId " + transactionId
+ ", Caught exception after internal delete - ", e);
t = e;
}
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
+ if (LOG.isTraceEnabled()) LOG.trace("delete - txId " + transactionId + ", region " + m_regionDetails + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray()));
}
}
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalResponse.Builder deleteTransactionalResponseBuilder = DeleteTransactionalResponse.newBuilder();
@@ -1890,15 +1726,9 @@ CoprocessorService, Coprocessor {
deleteTransactionalResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- deleteTransactionalResponseBuilder.setHasException(true);
- deleteTransactionalResponseBuilder.setException(wre.toString());
- }
-
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("delete - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
deleteTransactionalResponseBuilder.setHasException(true);
deleteTransactionalResponseBuilder.setException(mue.toString());
}
@@ -1920,30 +1750,16 @@ CoprocessorService, Coprocessor {
Exception ge = null;
IOException gioe = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
org.apache.hadoop.hbase.client.Result result2 = null;
long transactionId = request.getTransactionId();
+ long startId = request.getStartId();
boolean exceptionThrown = false;
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:get threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- } else { */
-
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true)
- LOG.warn("TrxRegionEndpoint coprocessor: get - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("get - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
+ if (LOG.isTraceEnabled()) LOG.trace("get - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception");
mue = new MemoryUsageException("get memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId);
exceptionThrown = true;
}
@@ -1973,17 +1789,20 @@ CoprocessorService, Coprocessor {
String rowKey = Bytes.toString(row);
String getRowKey = Bytes.toString(getrow);
- LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Calling getScanner for regionName " + regionInfo.getRegionNameAsString() + ", row = " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) + ", getrow = " + Bytes.toStringBinary(getrow) + ", getrow in hex " + Hex.encodeHexString(getrow));
+ LOG.trace("get - txId " + transactionId +
+ ", Calling getScanner for region " + m_regionDetails +
+ ", row = " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) +
+ ", getrow = " + Bytes.toStringBinary(getrow) + ", getrow in hex " + Hex.encodeHexString(getrow));
}
- scanner = getScanner(transactionId, scan);
+ scanner = getScanner(transactionId, startId, scan);
if (scanner != null)
scanner.next(results);
result2 = Result.create(results);
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", getScanner result2 isEmpty is "
+ if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", getScanner result2 isEmpty is "
+ result2.isEmpty()
+ ", row "
+ Bytes.toStringBinary(result2.getRow())
@@ -1991,7 +1810,7 @@ CoprocessorService, Coprocessor {
+ result2.size());
} catch(Throwable e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
+ if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
}
finally {
@@ -1999,14 +1818,13 @@ CoprocessorService, Coprocessor {
try {
scanner.close();
} catch(Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
+ if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e));
ge = e;
}
}
}
} // ExceptionThrown
} // End of MemoryUsageCheck
- //} // End of WrongRegionCheck
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.GetTransactionalResponse.Builder getResponseBuilder = GetTransactionalResponse.newBuilder();
@@ -2016,9 +1834,9 @@ CoprocessorService, Coprocessor {
}
else
{
- if (t == null && wre == null && ge == null)
- gioe = new IOException("TrxRegionEndpoint coprocessor: get - result2 was null");
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", result2 was null ");
+ if (t == null && ge == null)
+ gioe = new IOException("get - result2 was null");
+ if (LOG.isTraceEnabled()) LOG.trace("get - txId " + transactionId + ", result2 was null ");
}
getResponseBuilder.setHasException(false);
@@ -2028,12 +1846,6 @@ CoprocessorService, Coprocessor {
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(t.toString());
}
-
- if (wre != null)
- {
- getResponseBuilder.setHasException(true);
- getResponseBuilder.setException(wre.toString());
- }
if (ge != null)
{
@@ -2049,7 +1861,7 @@ CoprocessorService, Coprocessor {
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
+ if (LOG.isTraceEnabled()) LOG.trace("get - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage exception");
getResponseBuilder.setHasException(true);
getResponseBuilder.setException(mue.toString());
}
@@ -2069,7 +1881,6 @@ CoprocessorService, Coprocessor {
RegionScanner scannert = null;
Throwable t = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
boolean exceptionThrown = false;
NullPointerException npe = null;
Exception ge = null;
@@ -2079,31 +1890,18 @@ CoprocessorService, Coprocessor {
long scannerId = 0L;
boolean isLoadingCfsOnDemandSet = false;
long transId = request.getTransactionId();
+ long startId = request.getStartId();
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - ENTER txId " + transId + " in region " + m_regionDetails);
- // First test if this region matches our region name
-
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:openScanner threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- exceptionThrown = true;
- } else
- */
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: openScanner - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("openScanner - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
exceptionThrown = true;
mue = new MemoryUsageException("openScanner memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transId);
}
@@ -2113,7 +1911,7 @@ CoprocessorService, Coprocessor {
try {
scan = ProtobufUtil.toScan(request.getScan());
if (scan == null)
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", scan was null");
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", scan was null");
} catch (Throwable e) {
if (LOG.isWarnEnabled()) LOG.warn("openScanner - txId " + transId
+ ", Caught exception ", e);
@@ -2123,8 +1921,8 @@ CoprocessorService, Coprocessor {
if (!exceptionThrown) {
if (scan == null) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", scan is null");
- npe = new NullPointerException("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", scan is null ");
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", scan is null");
+ npe = new NullPointerException("openScanner - txId " + transId + ", scan is null ");
ioe = new IOException("Invalid arguments to openScanner", npe);
exceptionThrown = true;
}
@@ -2146,36 +1944,36 @@ CoprocessorService, Coprocessor {
if (!exceptionThrown) {
try {
- scanner = getScanner(transId, scan);
+ scanner = getScanner(transId, startId, scan);
if (scanner != null) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", called getScanner, scanner is " + scanner);
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", called getScanner, scanner is " + scanner);
// Add the scanner to the map
scannerId = addScanner(transId, scanner, this.m_Region);
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", called addScanner, scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", called addScanner, scanner id " + scannerId + ", region " + m_regionDetails);
}
else
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", getScanner returned null, scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", getScanner returned null, scanner id " + scannerId + ", region " + m_regionDetails);
} catch (LeaseStillHeldException llse) {
/*
try {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: getScanner failed to get a lease " + scannerId);
+ if (LOG.isTraceEnabled()) LOG.trace("getScanner failed to get a lease " + scannerId);
}
*/
- LOG.error("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", getScanner Error opening scanner, " + llse.toString());
+ LOG.error("openScanner - txId " + transId + ", getScanner Error opening scanner, ", llse);
exceptionThrown = true;
lse = llse;
} catch (IOException e) {
- LOG.error("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", getScanner Error opening scanner, " + e.toString());
+ LOG.error("openScanner - txId " + transId + ", getScanner Error opening scanner, ", e);
exceptionThrown = true;
ioe = e;
}
}
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - txId " + transId + ", scanner id " + scannerId + ", region " + m_regionDetails);
}
}
@@ -2190,12 +1988,6 @@ CoprocessorService, Coprocessor {
openResponseBuilder.setException(t.toString());
}
- if (wre != null)
- {
- openResponseBuilder.setHasException(true);
- openResponseBuilder.setException(wre.toString());
- }
-
if (ioe != null)
{
openResponseBuilder.setHasException(true);
@@ -2210,7 +2002,7 @@ CoprocessorService, Coprocessor {
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: openScanner - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("openScanner - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
openResponseBuilder.setHasException(true);
openResponseBuilder.setException(mue.toString());
}
@@ -2232,7 +2024,6 @@ CoprocessorService, Coprocessor {
OutOfOrderScannerNextException ooo = null;
UnknownScannerException use = null;
MemoryUsageException mue = null;
- WrongRegionException wre = null;
Exception ne = null;
Scan scan = null;
List<Cell> cellResults = new ArrayList<Cell>();
@@ -2241,6 +2032,7 @@ CoprocessorService, Coprocessor {
long scannerId = request.getScannerId();
long transId = request.getTransactionId();
+ long startId = request.getStartId();
int numberOfRows = request.getNumberOfRows();
boolean closeScanner = request.getCloseScanner();
long nextCallSeq = request.getNextCallSeq();
@@ -2250,31 +2042,17 @@ CoprocessorService, Coprocessor {
boolean exceptionThrown = false;
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", numberOfRows " + numberOfRows + ", nextCallSeq " + nextCallSeq + ", closeScanner is " + closeScanner + ", region is " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", numberOfRows " + numberOfRows
+ + ", nextCallSeq " + nextCallSeq + ", closeScanner is " + closeScanner + ", region is " + m_regionDetails);
- /* commenting it out for the time-being
- java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
-
- // First test if this region matches our region name
-
- if (!name.equals(regionInfo.getRegionNameAsString())) {
- wre = new WrongRegionException("Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:performScan threw WrongRegionException" +
- "Request Region Name, " +
- name + ", does not match this region, " +
- regionInfo.getRegionNameAsString());
- } else
- */
{
if (memoryThrottle == true) {
if(memoryUsageWarnOnly == true) {
- LOG.warn("TrxRegionEndpoint coprocessor: performScan - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
+ LOG.warn("performScan - performing memoryPercentage " + memoryPercentage + ", warning memory usage exceeds indicated percentage");
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage");
mue = new MemoryUsageException("performScan memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transId);
}
}
@@ -2291,9 +2069,8 @@ CoprocessorService, Coprocessor {
if (keyFound != true)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - Unknown transaction [" + transId
- + "] in region ["
- + m_Region.getRegionInfo().getRegionNameAsString()
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - Unknown transaction [" + transId
+ + "] in region [" + m_regionDetails
+ "], will create an OutOfOrderProtocol exception ");
oop = new OutOfOrderProtocolException("performScan does not have an active transaction with an open scanner, txId: " + transId);
}
@@ -2305,7 +2082,7 @@ CoprocessorService, Coprocessor {
if (scanner != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", scanner is not null");
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", scanner is not null");
while (shouldContinue) {
hasMore = scanner.next(cellResults);
result = Result.create(cellResults);
@@ -2319,22 +2096,23 @@ CoprocessorService, Coprocessor {
if (count == numberOfRows || !hasMore)
shouldContinue = false;
}
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", count is " + count + ", hasMore is " + hasMore + ", result " + result.isEmpty());
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", row count is " + count
+ + ", hasMore is " + hasMore + ", result " + result.isEmpty() + " region " + m_regionDetails);
}
else
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + transId + ", scanner is null");
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + transId + ", scanner is null");
}
} catch(OutOfOrderScannerNextException ooone) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + " Caught OutOfOrderScannerNextException " + ooone.getMessage() + " " + stackTraceToString(ooone));
+ LOG.error("performScan - txId " + transId + ", scanner id " + scannerId + " Caught OutOfOrderScannerNextException " + ooone.getMessage() + " " + stackTraceToString(ooone));
ooo = ooone;
exceptionThrown = true;
} catch(ScannerTimeoutException cste) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + " Caught ScannerTimeoutException " + cste.getMessage() + " " + stackTraceToString(cste));
+ LOG.error("performScan - txId " + transId + ", scanner id " + scannerId + " Caught ScannerTimeoutException " + cste.getMessage() + " " + stackTraceToString(cste));
ste = cste;
exceptionThrown = true;
} catch(Throwable e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + " Caught throwable exception " + e.getMessage() + " " + stackTraceToString(e));
+ LOG.error("performScan - txId " + transId + ", scanner id " + scannerId + " Caught throwable exception " + e.getMessage() + " " + stackTraceToString(e));
t = e;
exceptionThrown = true;
}
@@ -2342,7 +2120,7 @@ CoprocessorService, Coprocessor {
if (scanner != null) {
try {
if (closeScanner) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", close scanner was true, closing the scanner" + ", closeScanner is " + closeScanner + ", region is " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", scanner id " + scannerId + ", close scanner was true, closing the scanner" + ", closeScanner is " + closeScanner + ", region is " + m_regionDetails);
removeScanner(scannerId);
scanner.close();
/*
@@ -2350,12 +2128,12 @@ CoprocessorService, Coprocessor {
scannerLeases.cancelLease(getScannerLeaseId(scannerId));
} catch (LeaseException le) {
// ignore
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan failed to get a lease " + scannerId);
+ if (LOG.isTraceEnabled()) LOG.trace("performScan failed to get a lease " + scannerId);
}
*/
}
} catch(Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - transaction id " + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e));
+ LOG.error("performScan - transaction id " + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e));
ne = e;
exceptionThrown = true;
}
@@ -2370,7 +2148,7 @@ CoprocessorService, Coprocessor {
if (rsh == null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan rsh is null");
+ if (LOG.isTraceEnabled()) LOG.trace("performScan rsh is null");
use = new UnknownScannerException(
"ScannerId: " + scannerId + ", already closed?");
}
@@ -2380,16 +2158,16 @@ CoprocessorService, Coprocessor {
if (rsh == null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", performScan rsh is null, UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + nextCallSeq + ", for region " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - txId " + transId + ", performScan rsh is null, UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + nextCallSeq + ", for region " + m_regionDetails);
use = new UnknownScannerException(
- "ScannerId: " + scannerId + ", was scanner already closed?, transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region " + regionInfo.getRegionNameAsString());
+ "ScannerId: " + scannerId + ", was scanner already closed?, transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region " + m_regionDetails);
}
else
{
rsh.nextCallSeq = nextCallSeq;
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString() +
-", nextCallSeq " + nextCallSeq + ", rsh.nextCallSeq " + rsh.nextCallSeq + ", close scanner is " + closeScanner);
+ if (LOG.isTraceEnabled()) LOG.trace("performScan exit - txId " + transId + ", scanner id " + scannerId + " row count " + count + ", region " + m_regionDetails
+ + ", nextCallSeq " + nextCallSeq + ", rsh.nextCallSeq " + rsh.nextCallSeq + ", close scanner is " + closeScanner);
}
}
@@ -2426,13 +2204,6 @@ CoprocessorService, Coprocessor {
performResponseBuilder.setException(ste.toString());
}
- if (wre != null)
- {
- performResponseBuilder.setHasMore(false);
- performResponseBuilder.setHasException(true);
- performResponseBuilder.setException(wre.toString());
- }
-
if (ne != null)
{
performResponseBuilder.setHasMore(false);
@@ -2461,15 +2232,15 @@ CoprocessorService, Coprocessor {
{
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(oop.toString());
- LOG.warn("TrxRegionEndpoint coprocessor: performScan - OutOfOrderProtocolException, transaction was not found, txId: " + transId + ", return exception" + ", regionName " + regionInfo.getRegionNameAsString());
+ LOG.warn("performScan - OutOfOrderProtocolException, transaction was not found, txId: " + transId + ", return exception" + ", region " + m_regionDetails);
}
else
- LOG.warn("TrxRegionEndpoint coprocessor: performScan - suppressing OutOfOrderProtocolException, transaction was not found, txId: " + transId + ", regionName " + regionInfo.getRegionNameAsString());
+ LOG.warn("performScan - suppressing OutOfOrderProtocolException, transaction was not found, txId: " + transId + ", region " + m_regionDetails);
}
if (mue != null)
{
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ if (LOG.isTraceEnabled()) LOG.trace("performScan - performing memoryPercentage " + memoryPercentage + ", posting memory usage exceeds indicated percentage");
performResponseBuilder.setHasMore(false);
performResponseBuilder.setHasException(true);
performResponseBuilder.setException(mue.toString());
@@ -2480,549 +2251,374 @@ CoprocessorService, Coprocessor {
}
public void deleteTlogEntries(RpcController controller,
- TlogDeleteRequest request, RpcCallback<TlogDeleteResponse> done) {
+ TlogDeleteRequest request, RpcCallback<TlogDeleteResponse> done) {
+ boolean hasMore = true;
+ InternalScanner scanner = null;
+ Throwable t = null;
+ ScannerTimeoutException ste = null;
+ OutOfOrderProtocolException oop = null;
+ OutOfOrderScannerNextException ooo = null;
+ UnknownScannerException use = null;
+ MemoryUsageException mue = null;
+ Exception ne = null;
+ Scan scan = null;
+ List<Cell> cellResults = new ArrayList<Cell>();
+ org.apache.hadoop.hbase.client.Result result = null;
+ long lvAsn = request.getAuditSeqNum();
+ boolean lvAgeCommitted = request.getAgeCommitted();
+
+ try{
+ scan = ProtobufUtil.toScan(request.getScan());
+ // use an internal scanner to perform scanning.
+ scanner = m_Region.getScanner(scan);
+ }
+ catch (Exception e){
+ if (LOG.isErrorEnabled()) LOG.error("deleteTlogEntries Exception in region: "
+ + m_regionDetails + " getting scanner ", e );
+ }
+
+ long count = 0L;
+ boolean shouldContinue = true;
+
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries ENTRY. Records older than " + lvAsn
+ + " will be deleted in region: " + m_regionDetails);
+
+ try {
+ if (scanner != null){
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - scanner is not null");
+ while (shouldContinue) {
+ hasMore = scanner.next(cellResults);
+ result = Result.create(cellResults);
+ if (!result.isEmpty()) {
+ for (Cell cell : result.rawCells()) {
+ String valueString = new String(CellUtil.cloneValue(cell));
+ StringTokenizer st = new StringTokenizer(valueString, ",");
+ if (st.hasMoreElements()) {
+ String asnToken = st.nextElement().toString();
+ String transidToken = st.nextElement().toString();
+ String stateToken = st.nextElement().toString();
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries transidToken: "
+ + transidToken + " asnToken: " + asnToken);
+ if (Long.parseLong(asnToken) < lvAsn) {
+ if ( (stateToken.contains(TransState.STATE_FORGOTTEN.toString())) ||
+ (stateToken.equals(TransState.STATE_COMMITTED.toString()) && (lvAgeCommitted)) ||
+ (stateToken.equals(TransState.STATE_ABORTED.toString()) && (lvAgeCommitted))) {
+
+ if (LOG.isTraceEnabled()) LOG.trace("Deleting transid: " + transidToken
+ + " from region: " + m_regionDetails + " with state: " + stateToken);
+ try {
+ Delete d = new Delete(result.getRow());
+ d.setDurability(Durability.SKIP_WAL);
+ m_Region.delete(d);
+ }
+ catch (Exception e) {
+ LOG.warn("deleteTlogEntries -"
+ + " txId " + transidToken + ", Executing delete caught an exception ", e);
+ throw new IOException("deleteTlogEntries -"
+ + " txId " + transidToken + ", Executing delete caught an exception " + e.toString());
+ }
+ count++;
+ }
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Ending scan at asn: " + asnToken
+ + ", transid: " + transidToken +
+ " because it is not less than the comparator: " + lvAsn +
+ " in region: " + m_regionDetails);
+ shouldContinue = false;
+ break;
+ }
+ } // if (st.hasMoreElements()
+ } // for (Cell cell : result.rawCells()
+ } // if (!result.isEmpty()
+ cellResults.clear();
+
+ if (!hasMore){
+ shouldContinue = false;
+ }
+ } // while (shouldContinue)
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - count is " + count + ", hasMore is " + hasMore
+ + ", result " + result.isEmpty());
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - scanner is null");
+ }
+ } catch(OutOfOrderScannerNextException ooone) {
+ LOG.warn("deleteTlogEntries - Caught OutOfOrderScannerNextException ", ooone);
+ ooo = ooone;
+ } catch(ScannerTimeoutException cste) {
+ LOG.warn("deleteTlogEntries - Caught ScannerTimeoutException ", cste);
+ ste = cste;
+ } catch(Throwable e) {
+ LOG.warn("deleteTlogEntries - Caught throwable exception ", e);
+ t = e;
+ } finally {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries - closing the scanner, region is " + m_regionDetails);
+ try{
+ scanner.close();
+ }
+ catch(IOException ioe){
+ LOG.warn("deleteTlogEntries - except
<TRUNCATED>