You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2015/10/14 03:14:46 UTC
[03/11] incubator-trafodion git commit: TRAFODION-1521 Build
Trafodion without having HBase installed
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java b/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java
deleted file mode 100644
index 22ad0e3..0000000
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java
+++ /dev/null
@@ -1,1201 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.dtm;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.Logger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.transactional.TransactionManager;
-import org.apache.hadoop.hbase.client.transactional.TransactionState;
-import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
-import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
-import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
-import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
-import org.apache.hadoop.hbase.client.transactional.TransState;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.LocalHBaseCluster;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-
-public class TmAuditTlog {
-
- static final Log LOG = LogFactory.getLog(TmAuditTlog.class);
- private static HBaseAdmin admin;
- private Configuration config;
- private static String TLOG_TABLE_NAME;
- private static final byte[] TLOG_FAMILY = Bytes.toBytes("tf");
- private static final byte[] ASN_STATE = Bytes.toBytes("as");
- private static final byte[] QUAL_TX_STATE = Bytes.toBytes("tx");
- private static HTable[] table;
- private static HBaseAuditControlPoint tLogControlPoint;
- private static long tLogControlPointNum;
- private static long tLogHashKey;
- private static int tLogHashShiftFactor;
- private int dtmid;
-
- // For performance metrics
- private static long[] startTimes;
- private static long[] endTimes;
- private static long[] synchTimes;
- private static long[] bufferSizes;
- private static AtomicInteger timeIndex;
- private static long totalWriteTime;
- private static long totalSynchTime;
- private static long totalPrepTime;
- private static AtomicLong totalWrites;
- private static AtomicLong totalRecords;
- private static long minWriteTime;
- private static long minWriteTimeBuffSize;
- private static long maxWriteTime;
- private static long maxWriteTimeBuffSize;
- private static double avgWriteTime;
- private static long minPrepTime;
- private static long maxPrepTime;
- private static double avgPrepTime;
- private static long minSynchTime;
- private static long maxSynchTime;
- private static double avgSynchTime;
- private static long minBufferSize;
- private static long maxBufferSize;
- private static double avgBufferSize;
-
- private static int versions;
- private static int tlogNumLogs;
- private boolean useAutoFlush;
- private static boolean ageCommitted;
- private static boolean forceControlPoint;
- private boolean disableBlockCache;
- private boolean controlPointDeferred;
-
- private static AtomicLong asn; // Audit sequence number is the monotonic increasing value of the tLog write
-
- private static Object tlogAuditLock[]; // Lock for synchronizing access via regions.
-
- private static Object tablePutLock; // Lock for synchronizing table.put operations
- // to avoid ArrayIndexOutOfBoundsException
- private static byte filler[];
-
- public static final int TM_TX_STATE_NOTX = 0; //S0 - NOTX
- public static final int TM_TX_STATE_ACTIVE = 1; //S1 - ACTIVE
- public static final int TM_TX_STATE_FORGOTTEN = 2; //N/A
- public static final int TM_TX_STATE_COMMITTED = 3; //N/A
- public static final int TM_TX_STATE_ABORTING = 4; //S4 - ROLLBACK
- public static final int TM_TX_STATE_ABORTED = 5; //S4 - ROLLBACK
- public static final int TM_TX_STATE_COMMITTING = 6; //S3 - PREPARED
- public static final int TM_TX_STATE_PREPARING = 7; //S2 - IDLE
- public static final int TM_TX_STATE_FORGETTING = 8; //N/A
- public static final int TM_TX_STATE_PREPARED = 9; //S3 - PREPARED XARM Branches only!
- public static final int TM_TX_STATE_FORGETTING_HEUR = 10; //S5 - HEURISTIC
- public static final int TM_TX_STATE_BEGINNING = 11; //S1 - ACTIVE
- public static final int TM_TX_STATE_HUNGCOMMITTED = 12; //N/A
- public static final int TM_TX_STATE_HUNGABORTED = 13; //S4 - ROLLBACK
- public static final int TM_TX_STATE_IDLE = 14; //S2 - IDLE XARM Branches only!
- public static final int TM_TX_STATE_FORGOTTEN_HEUR = 15; //S5 - HEURISTIC - Waiting Superior TM xa_forget request
- public static final int TM_TX_STATE_ABORTING_PART2 = 16; // Internal State
- public static final int TM_TX_STATE_TERMINATING = 17;
- public static final int TM_TX_STATE_LAST = 17;
-
- private class AuditBuffer{
- private ArrayList<Put> buffer; // Each Put is an audit record
-
- private AuditBuffer () {
- buffer = new ArrayList<Put>();
- buffer.clear();
-
- }
-
- private void bufferAdd(Put localPut) throws Exception {
- long threadId = Thread.currentThread().getId();
- if (LOG.isTraceEnabled()) LOG.trace("BufferAdd start in thread " + threadId );
- try {
- buffer.add(localPut);
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferAdd" + e);
- throw e;
- }
- if (LOG.isTraceEnabled()) LOG.trace("BufferAdd end in thread " + threadId );
- }
-
- private int bufferSize() throws Exception {
- int lvSize;
- long threadId = Thread.currentThread().getId();
- if (LOG.isTraceEnabled()) LOG.trace("BufferSize start in thread " + threadId );
- try {
- lvSize = buffer.size();
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferSize" + e);
- throw e;
- }
- if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferSize end; returning " + lvSize + " in thread "
- + Thread.currentThread().getId());
- return lvSize;
- }
-
- private void bufferClear() throws Exception {
- long threadId = Thread.currentThread().getId();
- if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferClear start in thread " + threadId);
- try {
- buffer.clear();
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("Exception trying bufferClear.clear" + e);
- throw e;
- }
- if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferClear end in thread " + threadId);
- }
-
- private ArrayList<Put> getBuffer() throws Exception {
- long threadId = Thread.currentThread().getId();
- if (LOG.isTraceEnabled()) LOG.trace("getBuffer start in thread " + threadId );
- return this.buffer;
- }
- }// End of class AuditBuffer
-
- public class TmAuditTlogRegionSplitPolicy extends RegionSplitPolicy {
-
- @Override
- protected boolean shouldSplit(){
- return false;
- }
- }
-
- public TmAuditTlog (Configuration config) throws IOException, RuntimeException {
-
- this.config = config;
- this.dtmid = Integer.parseInt(config.get("dtmid"));
- if (LOG.isTraceEnabled()) LOG.trace("Enter TmAuditTlog constructor for dtmid " + dtmid);
- TLOG_TABLE_NAME = config.get("TLOG_TABLE_NAME");
- int fillerSize = 2;
- controlPointDeferred = false;
-
- forceControlPoint = false;
- try {
- String controlPointFlush = System.getenv("TM_TLOG_FLUSH_CONTROL_POINT");
- if (controlPointFlush != null){
- forceControlPoint = (Integer.parseInt(controlPointFlush) != 0);
- if (LOG.isTraceEnabled()) LOG.trace("controlPointFlush != null");
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FLUSH_CONTROL_POINT is not in ms.env");
- }
- LOG.info("forceControlPoint is " + forceControlPoint);
-
- useAutoFlush = true;
- try {
- String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH");
- if (autoFlush != null){
- useAutoFlush = (Integer.parseInt(autoFlush) != 0);
- if (LOG.isTraceEnabled()) LOG.trace("autoFlush != null");
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AUTO_FLUSH is not in ms.env");
- }
- LOG.info("useAutoFlush is " + useAutoFlush);
-
- ageCommitted = false;
- try {
- String ageCommittedRecords = System.getenv("TM_TLOG_AGE_COMMITTED_RECORDS");
- if (ageCommittedRecords != null){
- ageCommitted = (Integer.parseInt(ageCommittedRecords) != 0);
- if (LOG.isTraceEnabled()) LOG.trace("ageCommittedRecords != null");
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AGE_COMMITTED_RECORDS is not in ms.env");
- }
- LOG.info("ageCommitted is " + ageCommitted);
-
- versions = 5;
- try {
- String maxVersions = System.getenv("TM_TLOG_MAX_VERSIONS");
- if (maxVersions != null){
- versions = (Integer.parseInt(maxVersions) > versions ? Integer.parseInt(maxVersions) : versions);
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_MAX_VERSIONS is not in ms.env");
- }
-
- tlogNumLogs = 1;
- try {
- String numLogs = System.getenv("TM_TLOG_NUM_LOGS");
- if (numLogs != null) {
- tlogNumLogs = Math.max( 1, Integer.parseInt(numLogs));
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_NUM_LOGS is not in ms.env");
- }
- disableBlockCache = false;
- try {
- String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
- if (blockCacheString != null){
- disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
- if (LOG.isTraceEnabled()) LOG.trace("disableBlockCache != null");
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not in ms.env");
- }
- LOG.info("disableBlockCache is " + disableBlockCache);
-
- switch (tlogNumLogs) {
- case 1:
- tLogHashKey = 0b0;
- tLogHashShiftFactor = 63;
- break;
- case 2:
- tLogHashKey = 0b1;
- tLogHashShiftFactor = 63;
- break;
- case 4:
- tLogHashKey = 0b11;
- tLogHashShiftFactor = 62;
- break;
- case 8:
- tLogHashKey = 0b111;
- tLogHashShiftFactor = 61;
- break;
- case 16:
- tLogHashKey = 0b1111;
- tLogHashShiftFactor = 60;
- break;
- case 32:
- tLogHashKey = 0b11111;
- tLogHashShiftFactor = 59;
- break;
- default : {
- LOG.error("TM_TLOG_NUM_LOGS must b 1 or a power of 2 in the range 2-32");
- throw new RuntimeException();
- }
- }
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_NUM_LOGS is " + tlogNumLogs);
-
- HColumnDescriptor hcol = new HColumnDescriptor(TLOG_FAMILY);
- if (disableBlockCache) {
- hcol.setBlockCacheEnabled(false);
- }
- hcol.setMaxVersions(versions);
- admin = new HBaseAdmin(config);
-
- filler = new byte[fillerSize];
- Arrays.fill(filler, (byte) ' ');
- startTimes = new long[50];
- endTimes = new long[50];
- synchTimes = new long[50];
- bufferSizes = new long[50];
- totalWriteTime = 0;
- totalSynchTime = 0;
- totalPrepTime = 0;
- totalWrites = new AtomicLong(0);
- totalRecords = new AtomicLong(0);
- minWriteTime = 1000000000;
- minWriteTimeBuffSize = 0;
- maxWriteTime = 0;
- maxWriteTimeBuffSize = 0;
- avgWriteTime = 0;
- minPrepTime = 1000000000;
- maxPrepTime = 0;
- avgPrepTime = 0;
- minSynchTime = 1000000000;
- maxSynchTime = 0;
- avgSynchTime = 0;
- minBufferSize = 1000;
- maxBufferSize = 0;
- avgBufferSize = 0;
- timeIndex = new AtomicInteger(1);
-
- asn = new AtomicLong(); // Monotonically increasing count of write operations
-
- long lvAsn = 0;
-
- try {
- if (LOG.isTraceEnabled()) LOG.trace("try new HBaseAuditControlPoint");
- tLogControlPoint = new HBaseAuditControlPoint(config);
- }
- catch (Exception e) {
- LOG.error("Unable to create new HBaseAuditControlPoint object " + e);
- }
-
- tlogAuditLock = new Object[tlogNumLogs];
- table = new HTable[tlogNumLogs];
-
- try {
- // Get the asn from the last control point. This ignores
- // any asn increments between the last control point
- // write and a system crash and could result in asn numbers
- // being reused. However this would just mean that some old
- // records are held onto a bit longer before cleanup and is safe.
- asn.set(tLogControlPoint.getStartingAuditSeqNum());
- }
- catch (Exception e2){
- if (LOG.isDebugEnabled()) LOG.debug("Exception setting the ASN " + e2);
- if (LOG.isDebugEnabled()) LOG.debug("Setting the ASN to 1");
- asn.set(1L); // Couldn't read the asn so start asn at 1
- }
-
- for (int i = 0 ; i < tlogNumLogs; i++) {
- tlogAuditLock[i] = new Object();
- String lv_tLogName = new String(TLOG_TABLE_NAME + "_LOG_" + Integer.toHexString(i));
- boolean lvTlogExists = admin.tableExists(lv_tLogName);
- if (LOG.isTraceEnabled()) LOG.trace("Tlog table " + lv_tLogName + (lvTlogExists? " exists" : " does not exist" ));
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(lv_tLogName));
- desc.addFamily(hcol);
-
- if (lvTlogExists == false) {
- // Need to prime the asn for future writes
- try {
- if (LOG.isTraceEnabled()) LOG.trace("Creating the table " + lv_tLogName);
- admin.createTable(desc);
- asn.set(1L); // TLOG didn't exist previously, so start asn at 1
- }
- catch (TableExistsException e) {
- LOG.error("Table " + lv_tLogName + " already exists");
- }
- }
- try {
- if (LOG.isTraceEnabled()) LOG.trace("try new HTable index " + i);
- table[i] = new HTable(config, desc.getName());
- }
- catch(Exception e){
- LOG.error("TmAuditTlog Exception on index " + i + "; " + e);
- throw new RuntimeException(e);
- }
-
- table[i].setAutoFlushTo(this.useAutoFlush);
-
- }
-
- lvAsn = asn.get();
- // This control point write needs to be delayed until after recovery completes,
- // but is here as a placeholder
- if (LOG.isTraceEnabled()) LOG.trace("Starting a control point with asn value " + lvAsn);
- tLogControlPointNum = tLogControlPoint.doControlPoint(lvAsn);
-
- if (LOG.isTraceEnabled()) LOG.trace("Exit constructor()");
- return;
- }
-
- public long getNextAuditSeqNum(int nid) throws IOException{
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum node: " + nid);
- return tLogControlPoint.getNextAuditSeqNum(nid);
- }
-
- public static long asnGetAndIncrement () {
- if (LOG.isTraceEnabled()) LOG.trace("asnGetAndIncrement");
- return asn.getAndIncrement();
- }
-
- public void putSingleRecord(final long lvTransid, final long lvCommitId, final String lvTxState, final Set<TransactionRegionLocation> regions, boolean forced) throws Exception {
- putSingleRecord(lvTransid, lvCommitId, lvTxState, regions, forced, -1);
- }
-
- public void putSingleRecord(final long lvTransid, final long lvCommitId, final String lvTxState, final Set<TransactionRegionLocation> regions, boolean forced, long recoveryASN) throws Exception {
- long threadId = Thread.currentThread().getId();
- if (LOG.isTraceEnabled()) LOG.trace("putSingleRecord start in thread " + threadId);
- StringBuilder tableString = new StringBuilder();
- String transidString = new String(String.valueOf(lvTransid));
- String commitIdString = new String(String.valueOf(lvCommitId));
- boolean lvResult = true;
- long lvAsn;
- long startSynch = 0;
- long endSynch = 0;
- int lv_lockIndex = 0;
- int lv_TimeIndex = (timeIndex.getAndIncrement() % 50 );
- long lv_TotalWrites = totalWrites.incrementAndGet();
- long lv_TotalRecords = totalRecords.incrementAndGet();
- if (regions != null) {
- // Regions passed in indicate a state record where recovery might be needed following a crash.
- // To facilitate branch notification we translate the regions into table names that can then
- // be translated back into new region names following a restart. THis allows us to ensure all
- // branches reply prior to cleanup
- Iterator<TransactionRegionLocation> it = regions.iterator();
- List<String> tableNameList = new ArrayList<String>();
- while (it.hasNext()) {
- String name = new String(it.next().getRegionInfo().getTable().getNameAsString());
- if ((name.length() > 0) && (tableNameList.contains(name) != true)){
- // We have a table name not already in the list
- tableNameList.add(name);
- tableString.append(",");
- tableString.append(name);
- }
- }
- if (LOG.isTraceEnabled()) LOG.trace("table names: " + tableString.toString());
- }
- //Create the Put as directed by the hashed key boolean
- Put p;
-
- //create our own hashed key
- long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
- lv_lockIndex = (int)(lvTransid & tLogHashKey);
- if (LOG.isTraceEnabled()) LOG.trace("key: " + key + ", hex: " + Long.toHexString(key) + ", transid: " + lvTransid);
- p = new Put(Bytes.toBytes(key));
-
- if (recoveryASN == -1){
- // This is a normal audit record so we manage the ASN
- lvAsn = asn.getAndIncrement();
- }
- else {
- // This is a recovery audit record so use the ASN passed in
- lvAsn = recoveryASN;
- }
- if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState + " ASN: " + lvAsn);
- p.add(TLOG_FAMILY, ASN_STATE, Bytes.toBytes(String.valueOf(lvAsn) + ","
- + transidString + "," + lvTxState
- + "," + Bytes.toString(filler)
- + "," + commitIdString
- + "," + tableString.toString()));
-
-
- if (recoveryASN != -1){
- // We need to send this to a remote Tlog, not our local one, so open the appropriate table
- HTableInterface recoveryTable;
- int lv_ownerNid = (int)(lvTransid >> 32);
- String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(lv_ownerNid) + "_LOG_" + Integer.toHexString(lv_lockIndex));
- HConnection recoveryTableConnection = HConnectionManager.createConnection(this.config);
- recoveryTable = recoveryTableConnection.getTable(TableName.valueOf(lv_tLogName));
-
- try {
- recoveryTable.put(p);
- }
- catch (Exception e2){
- // create record of the exception
- LOG.error("putSingleRecord Exception in recoveryTable" + e2);
- e2.printStackTrace();
- throw e2;
- }
- finally {
- try {
- recoveryTable.close();
- recoveryTableConnection.close();
- }
- catch (IOException e) {
- LOG.error("putSingleRecord IOException closing recovery table or connection for table " + lv_tLogName);
- e.printStackTrace();
- }
- }
- }
- else {
- // THis goes to our local TLOG
- if (LOG.isTraceEnabled()) LOG.trace("TLOG putSingleRecord synchronizing tlogAuditLock[" + lv_lockIndex + "] in thread " + threadId );
- startSynch = System.nanoTime();
- try {
- synchronized (tlogAuditLock[lv_lockIndex]) {
- endSynch = System.nanoTime();
- try {
- if (LOG.isTraceEnabled()) LOG.trace("try table.put " + p );
- startTimes[lv_TimeIndex] = System.nanoTime();
- table[lv_lockIndex].put(p);
- if ((forced) && (useAutoFlush == false)) {
- if (LOG.isTraceEnabled()) LOG.trace("flushing commits");
- table[lv_lockIndex].flushCommits();
- }
- endTimes[lv_TimeIndex] = System.nanoTime();
- }
- catch (Exception e2){
- // create record of the exception
- LOG.error("putSingleRecord Exception " + e2);
- e2.printStackTrace();
- throw e2;
- }
- } // End global synchronization
- }
- catch (Exception e) {
- // create record of the exception
- LOG.error("Synchronizing on tlogAuditLock[" + lv_lockIndex + "] Exception " + e);
- e.printStackTrace();
- throw e;
- }
- if (LOG.isTraceEnabled()) LOG.trace("TLOG putSingleRecord synchronization complete in thread " + threadId );
-
- synchTimes[lv_TimeIndex] = endSynch - startSynch;
- totalSynchTime += synchTimes[lv_TimeIndex];
- totalWriteTime += (endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]);
- if (synchTimes[lv_TimeIndex] > maxSynchTime) {
- maxSynchTime = synchTimes[lv_TimeIndex];
- }
- if (synchTimes[lv_TimeIndex] < minSynchTime) {
- minSynchTime = synchTimes[lv_TimeIndex];
- }
- if ((endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]) > maxWriteTime) {
- maxWriteTime = (endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]);
- }
- if ((endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]) < minWriteTime) {
- minWriteTime = (endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]);
- }
-
- if (lv_TimeIndex == 49) {
- timeIndex.set(1); // Start over so we don't exceed the array size
- }
-
- if (lv_TotalWrites == 59999) {
- avgWriteTime = (double) (totalWriteTime/lv_TotalWrites);
- avgSynchTime = (double) (totalSynchTime/lv_TotalWrites);
- LOG.info("TLog Audit Write Report\n" +
- " Total records: "
- + lv_TotalRecords + " in " + lv_TotalWrites + " write operations\n" +
- " Write time:\n" +
- " Min: "
- + minWriteTime / 1000 + " microseconds\n" +
- " Max: "
- + maxWriteTime / 1000 + " microseconds\n" +
- " Avg: "
- + avgWriteTime / 1000 + " microseconds\n" +
- " Synch time:\n" +
- " Min: "
- + minSynchTime / 1000 + " microseconds\n" +
- " Max: "
- + maxSynchTime / 1000 + " microseconds\n" +
- " Avg: "
- + avgSynchTime / 1000 + " microseconds\n");
-
- // Start at index 1 since there is no startTimes[0]
- timeIndex.set(1);
- endTimes[0] = System.nanoTime();
- totalWriteTime = 0;
- totalSynchTime = 0;
- totalPrepTime = 0;
- totalRecords.set(0);
- totalWrites.set(0);
- minWriteTime = 50000; // Some arbitrary high value
- maxWriteTime = 0;
- minWriteTimeBuffSize = 0;
- maxWriteTimeBuffSize = 0;
- minSynchTime = 50000; // Some arbitrary high value
- maxSynchTime = 0;
- minPrepTime = 50000; // Some arbitrary high value
- maxPrepTime = 0;
- minBufferSize = 1000; // Some arbitrary high value
- maxBufferSize = 0;
- }
- }// End else revoveryASN == -1
- if (LOG.isTraceEnabled()) LOG.trace("putSingleRecord exit");
- }
-
- public static int getRecord(final long lvTransid) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getRecord start");
- TransState lvTxState = TransState.STATE_NOTX;
- String stateString;
- int lv_lockIndex = (int)(lvTransid & tLogHashKey);
- try {
- String transidString = new String(String.valueOf(lvTransid));
- Get g;
- //create our own hashed key
- long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
- if (LOG.isTraceEnabled()) LOG.trace("key: " + key + " hex: " + Long.toHexString(key));
- g = new Get(Bytes.toBytes(key));
- try {
- Result r = table[lv_lockIndex].get(g);
- byte [] value = r.getValue(TLOG_FAMILY, ASN_STATE);
- stateString = new String (Bytes.toString(value));
- if (LOG.isTraceEnabled()) LOG.trace("stateString is " + stateString);
- if (stateString.compareTo("COMMITTED") == 0){
- lvTxState = TransState.STATE_COMMITTED;
- }
- else if (stateString.compareTo("ABORTED") == 0){
- lvTxState = TransState.STATE_ABORTED;
- }
- else if (stateString.compareTo("ACTIVE") == 0){
- lvTxState = TransState.STATE_ACTIVE;
- }
- else if (stateString.compareTo("PREPARED") == 0){
- lvTxState = TransState.STATE_PREPARED;
- }
- else if (stateString.compareTo("NOTX") == 0){
- lvTxState = TransState.STATE_NOTX;
- }
- else if (stateString.compareTo("FORGOTTEN") == 0){
- lvTxState = TransState.STATE_FORGOTTEN;
- }
- else if (stateString.compareTo("ABORTING") == 0){
- lvTxState = TransState.STATE_ABORTING;
- }
- else if (stateString.compareTo("COMMITTING") == 0){
- lvTxState = TransState.STATE_COMMITTING;
- }
- else if (stateString.compareTo("PREPARING") == 0){
- lvTxState = TransState.STATE_PREPARING;
- }
- else if (stateString.compareTo("FORGETTING") == 0){
- lvTxState = TransState.STATE_FORGETTING;
- }
- else if (stateString.compareTo("FORGETTING_HEUR") == 0){
- lvTxState = TransState.STATE_FORGETTING_HEUR;
- }
- else if (stateString.compareTo("BEGINNING") == 0){
- lvTxState = TransState.STATE_BEGINNING;
- }
- else if (stateString.compareTo("HUNGCOMMITTED") == 0){
- lvTxState = TransState.STATE_HUNGCOMMITTED;
- }
- else if (stateString.compareTo("HUNGABORTED") == 0){
- lvTxState = TransState.STATE_HUNGABORTED;
- }
- else if (stateString.compareTo("IDLE") == 0){
- lvTxState = TransState.STATE_IDLE;
- }
- else if (stateString.compareTo("FORGOTTEN_HEUR") == 0){
- lvTxState = TransState.STATE_FORGOTTEN_HEUR;
- }
- else if (stateString.compareTo("ABORTING_PART2") == 0){
- lvTxState = TransState.STATE_ABORTING_PART2;
- }
- else if (stateString.compareTo("TERMINATING") == 0){
- lvTxState = TransState.STATE_TERMINATING;
- }
- else {
- lvTxState = TransState.STATE_BAD;
- }
-
- if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState);
- }
- catch (IOException e){
- LOG.error("getRecord IOException");
- throw e;
- }
- catch (Exception e){
- LOG.error("getRecord Exception");
- throw e;
- }
- }
- catch (Exception e2) {
- LOG.error("getRecord Exception2 " + e2);
- e2.printStackTrace();
- }
-
- if (LOG.isTraceEnabled()) LOG.trace("getRecord end; returning " + lvTxState);
- return lvTxState.getValue();
- }
-
- public static String getRecord(final String transidString) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getRecord start");
- long lvTransid = Long.parseLong(transidString, 10);
- int lv_lockIndex = (int)(lvTransid & tLogHashKey);
- String lvTxState = new String("NO RECORD");
- try {
- Get g;
- //create our own hashed key
- long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
- if (LOG.isTraceEnabled()) LOG.trace("key: " + key + " hex: " + Long.toHexString(key));
- g = new Get(Bytes.toBytes(key));
- try {
- Result r = table[lv_lockIndex].get(g);
- byte [] value = r.getValue(TLOG_FAMILY, ASN_STATE);
- StringTokenizer st = new StringTokenizer(value.toString(), ",");
- String asnToken = st.nextElement().toString();
- String transidToken = st.nextElement().toString();
- lvTxState = st.nextElement().toString();
- if (LOG.isTraceEnabled()) LOG.trace("transid: " + transidToken + " state: " + lvTxState);
- } catch (IOException e){
- LOG.error("getRecord IOException");
- throw e;
- }
- } catch (Exception e){
- LOG.error("getRecord Exception " + e);
- throw e;
- }
- if (LOG.isTraceEnabled()) LOG.trace("getRecord end; returning String:" + lvTxState);
- return lvTxState;
- }
-
-
- public static boolean deleteRecord(final long lvTransid) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start " + lvTransid);
- String transidString = new String(String.valueOf(lvTransid));
- int lv_lockIndex = (int)(lvTransid & tLogHashKey);
- try {
- Delete d;
- //create our own hashed key
- long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
- if (LOG.isTraceEnabled()) LOG.trace("key: " + key + " hex: " + Long.toHexString(key));
- d = new Delete(Bytes.toBytes(key));
- if (LOG.isTraceEnabled()) LOG.trace("deleteRecord (" + lvTransid + ") ");
- table[lv_lockIndex].delete(d);
- }
- catch (Exception e) {
- LOG.error("deleteRecord Exception " + e );
- }
- if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit");
- return true;
- }
-
- public boolean deleteAgedEntries(final long lvAsn) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries start: Entries older than " + lvAsn + " will be removed");
- HTableInterface deleteTable;
- for (int i = 0; i < tlogNumLogs; i++) {
- String lv_tLogName = new String(TLOG_TABLE_NAME + "_LOG_" + Integer.toHexString(i));
-// Connection deleteConnection = ConnectionFactory.createConnection(this.config);
-
- HConnection deleteConnection = HConnectionManager.createConnection(this.config);
-
- deleteTable = deleteConnection.getTable(TableName.valueOf(lv_tLogName));
- try {
- Scan s = new Scan();
- s.setCaching(100);
- s.setCacheBlocks(false);
- ArrayList<Delete> deleteList = new ArrayList<Delete>();
- ResultScanner ss = deleteTable.getScanner(s);
-
- try {
- for (Result r : ss) {
- for (Cell cell : r.rawCells()) {
- String valueString = new String(CellUtil.cloneValue(cell));
- StringTokenizer st = new StringTokenizer(valueString, ",");
- if (st.hasMoreElements()) {
- String asnToken = st.nextElement().toString() ;
- String transidToken = st.nextElement().toString() ;
- String stateToken = st.nextElement().toString() ;
- if ((Long.parseLong(asnToken) < lvAsn) && (stateToken.equals("FORGOTTEN"))) {
- String rowKey = new String(r.getRow());
- Delete del = new Delete(r.getRow());
- if (LOG.isTraceEnabled()) LOG.trace("adding transid: " + transidToken + " to delete list");
- deleteList.add(del);
- }
- else if ((Long.parseLong(asnToken) < lvAsn) &&
- (stateToken.equals("COMMITTED") || stateToken.equals("ABORTED"))) {
- if (ageCommitted) {
- String rowKey = new String(r.getRow());
- Delete del = new Delete(r.getRow());
- if (LOG.isTraceEnabled()) LOG.trace("adding transid: " + transidToken + " to delete list");
- deleteList.add(del);
- }
- else {
- String key = new String(r.getRow());
- Get get = new Get(r.getRow());
- get.setMaxVersions(versions); // will return last n versions of row
- Result lvResult = deleteTable.get(get);
- // byte[] b = lvResult.getValue(TLOG_FAMILY, ASN_STATE); // returns current version of value
- List<Cell> list = lvResult.getColumnCells(TLOG_FAMILY, ASN_STATE); // returns all versions of this column
- for (Cell element : list) {
- String value = new String(CellUtil.cloneValue(element));
- StringTokenizer stok = new StringTokenizer(value, ",");
- if (stok.hasMoreElements()) {
- if (LOG.isTraceEnabled()) LOG.trace("Performing secondary search on (" + transidToken + ")");
- asnToken = stok.nextElement().toString() ;
- transidToken = stok.nextElement().toString() ;
- stateToken = stok.nextElement().toString() ;
- if ((Long.parseLong(asnToken) < lvAsn) && (stateToken.equals("FORGOTTEN"))) {
- Delete del = new Delete(r.getRow());
- if (LOG.isTraceEnabled()) LOG.trace("Secondary search found new delete - adding (" + transidToken + ") with asn: " + asnToken + " to delete list");
- deleteList.add(del);
- break;
- }
- else {
- if (LOG.isTraceEnabled()) LOG.trace("Secondary search skipping entry with asn: " + asnToken + ", state: "
- + stateToken + ", transid: " + transidToken );
- }
- }
- }
- }
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries skipping asn: " + asnToken + ", transid: "
- + transidToken + ", state: " + stateToken);
- }
- }
- }
- }
- }
- catch(Exception e){
- LOG.error("deleteAgedEntries Exception getting results for table index " + i + "; " + e);
- throw new RuntimeException(e);
- }
- finally {
- ss.close();
- }
- if (LOG.isTraceEnabled()) LOG.trace("attempting to delete list with " + deleteList.size() + " elements");
- try {
- deleteTable.delete(deleteList);
- }
- catch(IOException e){
- LOG.error("deleteAgedEntries Exception deleting from table index " + i + "; " + e);
- throw new RuntimeException(e);
- }
- }
- catch (IOException e) {
- LOG.error("deleteAgedEntries IOException setting up scan on table index " + i);
- e.printStackTrace();
- }
- finally {
- try {
- deleteTable.close();
- deleteConnection.close();
- }
- catch (IOException e) {
- LOG.error("deleteAgedEntries IOException closing table or connection for table index " + i);
- e.printStackTrace();
- }
- }
- }
- if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries - exit");
- return true;
- }
-
- public long writeControlPointRecords (final Map<Long, TransactionState> map) throws IOException, Exception {
- int lv_lockIndex;
- int cpWrites = 0;
- long startTime = System.nanoTime();
- long endTime;
-
- if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords start with map size " + map.size());
-
- try {
- for (Map.Entry<Long, TransactionState> e : map.entrySet()) {
- try {
- Long transid = e.getKey();
- lv_lockIndex = (int)(transid & tLogHashKey);
- TransactionState value = e.getValue();
- if (value.getStatus().equals("COMMITTED")){
- if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords adding record for trans (" + transid + ") : state is " + value.getStatus());
- cpWrites++;
- if (forceControlPoint) {
- putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), true);
- }
- else {
- putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), false);
- }
- }
- }
- catch (Exception ex) {
- LOG.error("formatRecord Exception " + ex);
- ex.printStackTrace();
- throw ex;
- }
- }
- } catch (ConcurrentModificationException cme){
- LOG.info("writeControlPointRecords ConcurrentModificationException; delaying control point ");
- // Return the current value rather than incrementing this interval.
- controlPointDeferred = true;
- return tLogControlPoint.getCurrControlPt() - 1;
- }
-
- endTime = System.nanoTime();
- if (LOG.isDebugEnabled()) LOG.debug("TLog Control Point Write Report\n" +
- " Total records: "
- + map.size() + " in " + cpWrites + " write operations\n" +
- " Write time: " + (endTime - startTime) / 1000 + " microseconds\n" );
-
- if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords exit ");
- return -1L;
-
- }
-
-
- public long addControlPoint (final Map<Long, TransactionState> map) throws IOException, Exception {
- if (LOG.isTraceEnabled()) LOG.trace("addControlPoint start with map size " + map.size());
- long lvCtrlPt = 0L;
- long agedAsn; // Writes older than this audit seq num will be deleted
- long lvAsn; // local copy of the asn
- long key;
- boolean success = false;
-
- if (controlPointDeferred) {
- // We deferred the control point once already due to concurrency. We'll synchronize this timeIndex
- synchronized (map) {
- if (LOG.isTraceEnabled()) LOG.trace("Writing synchronized control point records");
- lvAsn = writeControlPointRecords(map);
- }
-
- controlPointDeferred = false;
- }
- else {
- lvAsn = writeControlPointRecords(map);
- if (lvAsn != -1L){
- return lvAsn;
- }
- }
-
- try {
- lvAsn = asn.getAndIncrement();
-
- // Write the control point interval and the ASN to the control point table
- lvCtrlPt = tLogControlPoint.doControlPoint(lvAsn);
-
- if ((lvCtrlPt - 5) > 0){ // We'll keep 5 control points of audit
- try {
- agedAsn = tLogControlPoint.getRecord(String.valueOf(lvCtrlPt - 5));
- if ((agedAsn > 0) && (lvCtrlPt % 5 == 0)){
- try {
- if (LOG.isTraceEnabled()) LOG.trace("Attempting to remove TLOG writes older than asn " + agedAsn);
- deleteAgedEntries(agedAsn);
- }
- catch (Exception e){
- LOG.error("deleteAgedEntries Exception " + e);
- throw e;
- }
- }
- try {
- tLogControlPoint.deleteAgedRecords(lvCtrlPt - 5);
- }
- catch (Exception e){
- if (LOG.isDebugEnabled()) LOG.debug("addControlPoint - control point record not found ");
- }
- }
- catch (IOException e){
- LOG.error("addControlPoint IOException");
- e.printStackTrace();
- throw e;
- }
- }
- } catch (Exception e){
- LOG.error("addControlPoint Exception " + e);
- e.printStackTrace();
- throw e;
- }
- if (LOG.isTraceEnabled()) LOG.trace("addControlPoint returning " + lvCtrlPt);
- return lvCtrlPt;
- }
-
- public void getTransactionState (TransactionState ts) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState start; transid: " + ts.getTransactionId());
-
- // This request might be for a transaction not originating on this node, so we need to open
- // the appropriate Tlog
- HTableInterface unknownTransactionTable;
- long lvTransid = ts.getTransactionId();
- int lv_ownerNid = (int)(lvTransid >> 32);
- int lv_lockIndex = (int)(lvTransid & tLogHashKey);
- String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(lv_ownerNid) + "_LOG_" + Integer.toHexString(lv_lockIndex));
- HConnection unknownTableConnection = HConnectionManager.createConnection(this.config);
- unknownTransactionTable = unknownTableConnection.getTable(TableName.valueOf(lv_tLogName));
-
- try {
- String transidString = new String(String.valueOf(lvTransid));
- Get g;
- long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
- if (LOG.isTraceEnabled()) LOG.trace("key: " + key + ", hexkey: " + Long.toHexString(key) + ", transid: " + lvTransid);
- g = new Get(Bytes.toBytes(key));
- TransState lvTxState = TransState.STATE_NOTX;
- String stateString = "";
- String transidToken = "";
- String commitIdToken = "";
- try {
- Result r = unknownTransactionTable.get(g);
- if (r == null) {
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog result is null: " + transidString);
- }
- if (r.isEmpty()) {
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog empty result: " + transidString);
- }
- byte [] value = r.getValue(TLOG_FAMILY, ASN_STATE);
- if (value == null) {
- ts.setStatus(TransState.STATE_NOTX);
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog value is null: " + transidString);
- return;
- }
- if (value.length == 0) {
- ts.setStatus(TransState.STATE_NOTX);
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog transaction not found: " + transidString);
- return;
- }
- ts.clearParticipatingRegions();
- String recordString = new String (Bytes.toString(value));
- StringTokenizer st = new StringTokenizer(recordString, ",");
- if (st.hasMoreElements()) {
- String asnToken = st.nextElement().toString();
- transidToken = st.nextElement().toString();
- stateString = st.nextElement().toString();
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: transaction: " + transidToken + " stateString is: " + stateString);
- }
- if (stateString.compareTo("COMMITTED") == 0){
- lvTxState = TransState.STATE_COMMITTED;
- }
- else if (stateString.compareTo("ABORTED") == 0){
- lvTxState = TransState.STATE_ABORTED;
- }
- else if (stateString.compareTo("ACTIVE") == 0){
- lvTxState = TransState.STATE_ACTIVE;
- }
- else if (stateString.compareTo("PREPARED") == 0){
- lvTxState = TransState.STATE_PREPARED;
- }
- else if (stateString.compareTo("NOTX") == 0){
- lvTxState = TransState.STATE_NOTX;
- }
- else if (stateString.compareTo("FORGOTTEN") == 0){
- // Need to get the previous state record so we know how to drive the regions
- String keyS = new String(r.getRow());
- Get get = new Get(r.getRow());
- get.setMaxVersions(versions); // will return last n versions of row
- Result lvResult = unknownTransactionTable.get(get);
- // byte[] b = lvResult.getValue(TLOG_FAMILY, ASN_STATE); // returns current version of value
- List<Cell> list = lvResult.getColumnCells(TLOG_FAMILY, ASN_STATE); // returns all versions of this column
- for (Cell element : list) {
- String stringValue = new String(CellUtil.cloneValue(element));
- st = new StringTokenizer(stringValue, ",");
- if (st.hasMoreElements()) {
- if (LOG.isTraceEnabled()) LOG.trace("Performing secondary search on (" + transidToken + ")");
- String asnToken = st.nextElement().toString() ;
- transidToken = st.nextElement().toString() ;
- String stateToken = st.nextElement().toString() ;
- if ((stateToken.compareTo("COMMITTED") == 0) || (stateToken.compareTo("ABORTED") == 0)) {
- String rowKey = new String(r.getRow());
- if (LOG.isTraceEnabled()) LOG.trace("Secondary search found record for (" + transidToken + ") with state: " + stateToken);
- lvTxState = (stateToken.compareTo("COMMITTED") == 0 ) ? TransState.STATE_COMMITTED : TransState.STATE_ABORTED;
- break;
- }
- else {
- if (LOG.isTraceEnabled()) LOG.trace("Secondary search skipping entry for (" +
- transidToken + ") with state: " + stateToken );
- }
- }
- }
- }
- else if (stateString.compareTo("ABORTING") == 0){
- lvTxState = TransState.STATE_ABORTING;
- }
- else if (stateString.compareTo("COMMITTING") == 0){
- lvTxState = TransState.STATE_COMMITTING;
- }
- else if (stateString.compareTo("PREPARING") == 0){
- lvTxState = TransState.STATE_PREPARING;
- }
- else if (stateString.compareTo("FORGETTING") == 0){
- lvTxState = TransState.STATE_FORGETTING;
- }
- else if (stateString.compareTo("FORGETTING_HEUR") == 0){
- lvTxState = TransState.STATE_FORGETTING_HEUR;
- }
- else if (stateString.compareTo("BEGINNING") == 0){
- lvTxState = TransState.STATE_BEGINNING;
- }
- else if (stateString.compareTo("HUNGCOMMITTED") == 0){
- lvTxState = TransState.STATE_HUNGCOMMITTED;
- }
- else if (stateString.compareTo("HUNGABORTED") == 0){
- lvTxState = TransState.STATE_HUNGABORTED;
- }
- else if (stateString.compareTo("IDLE") == 0){
- lvTxState = TransState.STATE_IDLE;
- }
- else if (stateString.compareTo("FORGOTTEN_HEUR") == 0){
- lvTxState = TransState.STATE_FORGOTTEN_HEUR;
- }
- else if (stateString.compareTo("ABORTING_PART2") == 0){
- lvTxState = TransState.STATE_ABORTING_PART2;
- }
- else if (stateString.compareTo("TERMINATING") == 0){
- lvTxState = TransState.STATE_TERMINATING;
- }
- else {
- lvTxState = TransState.STATE_BAD;
- }
-
- // get past the filler
- st.nextElement();
-
- commitIdToken = st.nextElement().toString();
- ts.setCommitId(Long.parseLong(commitIdToken));
-
- // Load the TransactionState object up with regions
- while (st.hasMoreElements()) {
- String tableNameToken = st.nextToken();
- HTable table = new HTable(config, tableNameToken);
- NavigableMap<HRegionInfo, ServerName> regions = table.getRegionLocations();
- Iterator<Map.Entry<HRegionInfo, ServerName>> it = regions.entrySet().iterator();
- while(it.hasNext()) { // iterate entries.
- NavigableMap.Entry<HRegionInfo, ServerName> pairs = it.next();
- HRegionInfo regionKey = pairs.getKey();
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: transaction: " + transidToken + " adding region: " + regionKey.getRegionNameAsString());
- ServerName serverValue = regions.get(regionKey);
- String hostAndPort = new String(serverValue.getHostAndPort());
- StringTokenizer tok = new StringTokenizer(hostAndPort, ":");
- String hostName = new String(tok.nextElement().toString());
- int portNumber = Integer.parseInt(tok.nextElement().toString());
- TransactionRegionLocation loc = new TransactionRegionLocation(regionKey, serverValue);
- ts.addRegion(loc);
- }
- }
- ts.setStatus(lvTxState);
-
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: returning transid: " + ts.getTransactionId() + " state: " + lvTxState);
- } catch (Exception e){
- LOG.error("getTransactionState Exception " + Arrays.toString(e.getStackTrace()));
- throw e;
- }
- }
- catch (Exception e2) {
- LOG.error("getTransactionState Exception2 " + e2);
- e2.printStackTrace();
- }
- if (LOG.isTraceEnabled()) LOG.trace("getTransactionState end transid: " + ts.getTransactionId());
- return;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java b/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java
deleted file mode 100644
index ab1305b..0000000
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java
+++ /dev/null
@@ -1,204 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.dtm;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-//H98import org.apache.hadoop.hbase.ipc.HMasterInterface;
-//H98import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
-//H98import org.apache.hadoop.hbase.ipc.HRegionInterface;
-
-
-public class TrafInfo {
-
- private HBaseAdmin hbadmin;
- private HConnection connection;
- Configuration config;
- // HMasterInterface hmaster;
-
- public TrafInfo() throws IOException {
- init();
- }
-
- public void init() throws IOException {
- this.config = HBaseConfiguration.create();
- this.connection = HConnectionManager.createConnection(config);
-
- try {
- hbadmin = new HBaseAdmin(config);
- //H98 hmaster = hbadmin.getMaster();
- } catch(Exception e) {
- System.out.println("ERROR: Unable to obtain HBase accessors, Exiting");
- e.printStackTrace();
- System.exit(1);
- }
- }
-
- public static void printHelp() {
- System.out.println("Run: $JAVA_HOME/bin/java org.trafodion.dtm.TrafInfo <command>");
- System.out.println("Commands to gather Transactional Region information:");
- System.out.println(" active :: active transactions per region");
- System.out.println(" committed :: committed transactions per region by sequence number");
- System.out.println(" indoubt :: in-doubt transactions per region");
- System.out.println(" <command> -v:: shows metadata tables");
- }
-
- public void getActivePendingTrans(String transType, String showmd){
- System.out.println("\n====================================================================");
- System.out.println("\t\tActive Pending Transactions");
- getTransactions(transType, showmd);
- }
-
- public void getCommittedTransactions(String transType, String showmd){
- System.out.println("\n====================================================================");
- System.out.println("\t\tCommitted Transactions by Sequence Number");
- getTransactions(transType, showmd);
- }
-
- public void getInDoubtTransactions(String transType, String showmd){
- System.out.println("\n====================================================================");
- System.out.println("\t\tIn-Doubt Transactions");
- getTransactions(transType, showmd);
- }
-
- public void getTransactions(String transType, String showmd){
- String regionName, tableName;
- int idx;
-
- /* H98
- Collection<ServerName> sn = hmaster.getClusterStatus().getServers();
- for(ServerName sname : sn) {
- System.out.println("===================================================================="
- + "\nServer Name: " + sname.toString() + "\n"
- + "\nTransId RegionId TableName");
-
- try {
-
- HRegionInterface regionServer = connection.getHRegionConnection(sname.getHostname(), sname.getPort());
- List<HRegionInfo> regions = regionServer.getOnlineRegions();
- connection.close();
-
- TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface)this.connection
- .getHRegionConnection(sname.getHostname(), sname.getPort());
-
- for (HRegionInfo rinfo: regions) {
-
- regionName = rinfo.getRegionNameAsString();
- idx = regionName.indexOf(',');
- tableName = regionName.substring(0, idx);
-
- if(!showmd.contains("-v")){
- if((tableName.contains("TRAFODION._MD_.")) || (tableName.contains("-ROOT-")) ||
- (tableName.equals(".META."))){
- continue;
- }
- }
- if(tableName.contains("TRAFODION._DTM_.")){
- continue;
- }
-
- System.out.println("--------------------------------------------------------------------"
- + "\n\t " + rinfo.getRegionId()
- + "\t" + tableName);
-
- if(transType.equals("active")){
- List<Long> result = transactionalRegionServer.getPendingTrans(rinfo.getRegionName());
- for(Long res : result)
- System.out.println(res);
- }
- else if(transType.equals("committed")){
- List<Long> result = transactionalRegionServer.getCommittedTrans(rinfo.getRegionName());
- for(Long res : result)
- System.out.println(res);
- }
- else if(transType.equals("indoubt")){
- List<Long> result = transactionalRegionServer.getInDoubtTrans(rinfo.getRegionName());
- for(Long res : result)
- System.out.println(res);
- }
- }
-
- } catch(Exception e) {
- System.out.println("ERROR: Unable to get region info, Exiting");
- e.printStackTrace();
- System.exit(1);
- }
- }
- */
-
-
- }
-
- public static void main(String[] args) throws IOException {
-
- if(args.length == 0) {
- TrafInfo.printHelp();
- System.exit(0);
- }
-
- TrafInfo ti = new TrafInfo();
-
- if(args.length == 1){
- if(args[0].equals("help"))
- TrafInfo.printHelp();
- else if(args[0].equals("active"))
- ti.getActivePendingTrans(args[0], "");
- else if(args[0].equals("committed"))
- ti.getCommittedTransactions(args[0], "");
- else if(args[0].equals("indoubt"))
- ti.getInDoubtTransactions(args[0], "");
- else {
- TrafInfo.printHelp();
- System.exit(0);
- }
- }
- // Verbose shows Metadata tables
- else if(args.length == 2){
- if(args[0].equals("active") && args[1].equals("-v"))
- ti.getActivePendingTrans(args[0], args[1]);
- else if(args[0].equals("committed") && args[1].equals("-v"))
- ti.getCommittedTransactions(args[0], args[1]);
- else if(args[0].equals("indoubt") && args[1].equals("-v"))
- ti.getInDoubtTransactions(args[0], args[1]);
- else {
- TrafInfo.printHelp();
- System.exit(0);
- }
- }
- else {
- TrafInfo.printHelp();
- System.exit(0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml b/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml
new file mode 100644
index 0000000..17f1fba
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+ <repositories>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+ </repositories>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <hbase.version>0.98.1-cdh5.1.0</hbase.version>
+ <hbase-trx.id>hbase-trx-cdh5_3</hbase-trx.id>
+ <java.version>1.7</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase.client.transactional</groupId>
+ <artifactId>${hbase-trx.id}</artifactId>
+ <version>${env.TRAFODION_VER}</version>
+ <scope>system</scope>
+ <systemPath>${env.MY_SQROOT}/export/lib/${hbase-trx.id}-${env.TRAFODION_VER}.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ </dependencies>
+
+ <groupId>org.trafodion.sql</groupId>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>trafodion-dtm</artifactId>
+ <version>${env.TRAFODION_VER}</version>
+ <name>trafodion-dtm</name>
+ <description>Java code for the Trafodion DTM (Distributed Transaction Manager)</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.koraktor</groupId>
+ <artifactId>mavanagaiata</artifactId>
+ <version>0.7.2</version>
+ <configuration>
+ <dirtyFlag>false</dirtyFlag>
+ <dirtyIgnoreUntracked>false</dirtyIgnoreUntracked>
+ <gitDir>${env.MY_SQROOT}/../../.git</gitDir>
+ <dateFormat>ddMMMyyyy</dateFormat>
+ </configuration>
+ <executions>
+ <execution>
+ <id>git-commit</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>commit</goal>
+ <goal>branch</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Implementation-Version-1>Version ${project.version}</Implementation-Version-1>
+ <Implementation-Version-2>Release ${project.version}</Implementation-Version-2>
+ <Implementation-Version-3>Build ${env.SQ_BUILD_TYPE}</Implementation-Version-3>
+ <Implementation-Version-4>[${user.name}]</Implementation-Version-4>
+ <Implementation-Version-5>Branch ${mvngit.commit.abbrev}-${mvngit.branch}</Implementation-Version-5>
+ <Implementation-Version-6>Date ${maven.build.timestamp}</Implementation-Version-6>
+ <Product-Name>${project.name}</Product-Name>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
new file mode 100644
index 0000000..11120ef
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
@@ -0,0 +1,416 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.dtm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
+import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.lang.NullPointerException;
+
+public class HBaseAuditControlPoint {
+
+ static final Log LOG = LogFactory.getLog(HBaseAuditControlPoint.class);
+ private static long currControlPt;
+ private static HBaseAdmin admin;
+ private Configuration config;
+ private static String CONTROL_POINT_TABLE_NAME;
+ private static final byte[] CONTROL_POINT_FAMILY = Bytes.toBytes("cpf");
+ private static final byte[] ASN_HIGH_WATER_MARK = Bytes.toBytes("hwm");
+ private static HTable table;
+ private boolean useAutoFlush;
+ private boolean disableBlockCache;
+
+ public HBaseAuditControlPoint(Configuration config) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("Enter HBaseAuditControlPoint constructor()");
+ this.config = config;
+ CONTROL_POINT_TABLE_NAME = config.get("CONTROL_POINT_TABLE_NAME");
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(CONTROL_POINT_TABLE_NAME));
+ HColumnDescriptor hcol = new HColumnDescriptor(CONTROL_POINT_FAMILY);
+
+ disableBlockCache = false;
+ try {
+ String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
+ if (blockCacheString != null){
+ disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
+ if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null");
+ }
+ }
+ catch (Exception e) {
+ if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not in ms.env");
+ }
+ LOG.info("disableBlockCache is " + disableBlockCache);
+ if (disableBlockCache) {
+ hcol.setBlockCacheEnabled(false);
+ }
+
+ desc.addFamily(hcol);
+ admin = new HBaseAdmin(config);
+
+ useAutoFlush = true;
+ try {
+ String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH");
+ if (autoFlush != null){
+ useAutoFlush = (Integer.parseInt(autoFlush) != 0);
+ if (LOG.isDebugEnabled()) LOG.debug("autoFlush != null");
+ }
+ }
+ catch (Exception e) {
+ if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AUTO_FLUSH is not in ms.env");
+ }
+ LOG.info("useAutoFlush is " + useAutoFlush);
+
+ boolean lvControlPointExists = admin.tableExists(CONTROL_POINT_TABLE_NAME);
+ if (LOG.isDebugEnabled()) LOG.debug("HBaseAuditControlPoint lvControlPointExists " + lvControlPointExists);
+ currControlPt = -1;
+ if (lvControlPointExists == false) {
+ try {
+ if (LOG.isDebugEnabled()) LOG.debug("Creating the table " + CONTROL_POINT_TABLE_NAME);
+ admin.createTable(desc);
+ currControlPt = 1;
+ }
+ catch (TableExistsException e) {
+ LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists");
+ }
+ }
+ try {
+ if (LOG.isDebugEnabled()) LOG.debug("try new HTable");
+ table = new HTable(config, desc.getName());
+ table.setAutoFlushTo(this.useAutoFlush);
+ }
+ catch (IOException e) {
+ LOG.error("new HTable IOException");
+ }
+
+ if (currControlPt == -1){
+ try {
+ currControlPt = getCurrControlPt();
+ }
+ catch (Exception e2) {
+ if (LOG.isDebugEnabled()) LOG.debug("Exit getCurrControlPoint() exception " + e2);
+ }
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("currControlPt is " + currControlPt);
+
+ if (LOG.isTraceEnabled()) LOG.trace("Exit constructor()");
+ return;
+ }
+
+ public long getCurrControlPt() throws Exception {
+ if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt: start");
+ long highKey = -1;
+ if (LOG.isDebugEnabled()) LOG.debug("new Scan");
+ Scan s = new Scan();
+ s.setCaching(10);
+ s.setCacheBlocks(false);
+ if (LOG.isDebugEnabled()) LOG.debug("resultScanner");
+ ResultScanner ss = table.getScanner(s);
+ try {
+ long currKey;
+ String rowKey;
+ if (LOG.isDebugEnabled()) LOG.debug("entering for loop" );
+ for (Result r : ss) {
+ rowKey = new String(r.getRow());
+ if (LOG.isDebugEnabled()) LOG.debug("rowKey is " + rowKey );
+ currKey = Long.parseLong(rowKey);
+ if (LOG.isDebugEnabled()) LOG.debug("value is " + Long.parseLong(Bytes.toString(r.value())));
+ if (currKey > highKey) {
+ if (LOG.isDebugEnabled()) LOG.debug("Setting highKey to " + currKey);
+ highKey = currKey;
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("getCurrControlPt IOException" + e);
+ e.printStackTrace();
+ } finally {
+ ss.close();
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt returning " + highKey);
+ return highKey;
+ }
+
+ public long putRecord(final long ControlPt, final long startingSequenceNumber) throws Exception {
+ if (LOG.isTraceEnabled()) LOG.trace("putRecord starting sequence number (" + String.valueOf(startingSequenceNumber) + ")");
+ String controlPtString = new String(String.valueOf(ControlPt));
+ Put p = new Put(Bytes.toBytes(controlPtString));
+ p.add(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK, Bytes.toBytes(String.valueOf(startingSequenceNumber)));
+ try {
+ if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber);
+ table.put(p);
+ if (useAutoFlush == false) {
+ if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
+ table.flushCommits();
+ }
+ }
+ catch (Exception e) {
+ LOG.error("HBaseAuditControlPoint:putRecord Exception" + e);
+ throw e;
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("HBaseAuditControlPoint:putRecord returning " + ControlPt);
+ return ControlPt;
+ }
+
+ public ArrayList<String> getRecordList(String controlPt) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getRecord");
+ ArrayList<String> transactionList = new ArrayList<String>();
+ Get g = new Get(Bytes.toBytes(controlPt));
+ Result r = table.get(g);
+ byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+ String recordString = new String(currValue);
+ if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
+ StringTokenizer st = new StringTokenizer(recordString, ",");
+ while (st.hasMoreElements()) {
+ String token = st.nextElement().toString() ;
+ if (LOG.isDebugEnabled()) LOG.debug("token is " + token);
+ transactionList.add(token);
+ }
+
+ if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit with list size (" + transactionList.size() + ")");
+ return transactionList;
+
+ }
+
+ public long getRecord(final String controlPt) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getRecord " + controlPt);
+ long lvValue = -1;
+ Get g = new Get(Bytes.toBytes(controlPt));
+ String recordString;
+ try {
+ Result r = table.get(g);
+ byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+ try {
+ recordString = new String (Bytes.toString(currValue));
+ if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
+ lvValue = Long.parseLong(recordString, 10);
+ }
+ catch (NullPointerException e){
+ if (LOG.isDebugEnabled()) LOG.debug("control point " + controlPt + " is not in the table");
+ }
+ }
+ catch (IOException e){
+ LOG.error("getRecord IOException");
+ throw e;
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit " + lvValue);
+ return lvValue;
+
+ }
+
+ public long getStartingAuditSeqNum() throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum");
+ String controlPtString = new String(String.valueOf(currControlPt));
+ long lvAsn;
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum new get for control point " + currControlPt);
+ Get g = new Get(Bytes.toBytes(controlPtString));
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum setting result");
+ Result r = table.get(g);
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum currValue CONTROL_POINT_FAMILY is "
+ + CONTROL_POINT_FAMILY + " ASN_HIGH_WATER_MARK " + ASN_HIGH_WATER_MARK);
+ byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+ if (LOG.isDebugEnabled()) LOG.debug("Starting asn setting recordString ");
+ String recordString = "";
+ try {
+ recordString = new String(currValue);
+ }
+ catch (NullPointerException e) {
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is null");
+ lvAsn = 1;
+ if (LOG.isDebugEnabled()) LOG.debug("Starting asn is 1");
+ return lvAsn;
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is good");
+ if (LOG.isDebugEnabled()) LOG.debug("Starting asn for control point " + currControlPt + " is " + recordString);
+ lvAsn = Long.valueOf(recordString);
+ if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum - exit returning " + lvAsn);
+ return lvAsn;
+ }
+
+ public long getNextAuditSeqNum(int nid) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum for node: " + nid);
+
+ // We need to open the appropriate control point table and read the value from it
+ HTableInterface remoteTable;
+ String lv_tName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(nid) + "_CONTROL_POINT");
+ HConnection remoteConnection = HConnectionManager.createConnection(this.config);
+ remoteTable = remoteConnection.getTable(TableName.valueOf(lv_tName));
+
+ long highValue = -1;
+ try {
+ Scan s = new Scan();
+ s.setCaching(10);
+ s.setCacheBlocks(false);
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum resultScanner");
+ ResultScanner ss = remoteTable.getScanner(s);
+ try {
+ long currValue;
+ String rowKey;
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum entering for loop" );
+ for (Result r : ss) {
+ rowKey = new String(r.getRow());
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum rowKey is " + rowKey );
+ currValue = Long.parseLong(Bytes.toString(r.value()));
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum value is " + currValue);
+ if (currValue > highValue) {
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum Setting highValue to " + currValue);
+ highValue = currValue;
+ }
+ }
+ }
+ catch (Exception e) {
+ LOG.error("getNextAuditSeqNum IOException" + e);
+ e.printStackTrace();
+ } finally {
+ ss.close();
+ }
+ }
+ catch (IOException e) {
+ LOG.error("getNextAuditSeqNum IOException setting up scan for " + lv_tName);
+ e.printStackTrace();
+ }
+ finally {
+ try {
+ remoteTable.close();
+ remoteConnection.close();
+ }
+ catch (IOException e) {
+ LOG.error("getNextAuditSeqNum IOException closing table or connection for " + lv_tName);
+ e.printStackTrace();
+ }
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum returning " + (highValue + 1));
+ return (highValue + 1);
+ }
+
+
+ public long doControlPoint(final long sequenceNumber) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint start");
+ try {
+ currControlPt++;
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord");
+ putRecord(currControlPt, sequenceNumber);
+ }
+ catch (Exception e) {
+ LOG.error("doControlPoint Exception" + e);
+ }
+
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint - exit");
+ return currControlPt;
+ }
+
+ public boolean deleteRecord(final long controlPoint) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start for control point " + controlPoint);
+ String controlPtString = new String(String.valueOf(controlPoint));
+
+ try {
+ List<Delete> list = new ArrayList<Delete>();
+ Delete del = new Delete(Bytes.toBytes(controlPtString));
+ if (LOG.isDebugEnabled()) LOG.debug("deleteRecord (" + controlPtString + ") ");
+ table.delete(del);
+ }
+ catch (Exception e) {
+ LOG.error("deleteRecord IOException");
+ }
+
+ if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit");
+ return true;
+ }
+
+ public boolean deleteAgedRecords(final long controlPoint) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords start - control point " + controlPoint);
+ String controlPtString = new String(String.valueOf(controlPoint));
+
+ Scan s = new Scan();
+ s.setCaching(10);
+ s.setCacheBlocks(false);
+ ArrayList<Delete> deleteList = new ArrayList<Delete>();
+ ResultScanner ss = table.getScanner(s);
+ try {
+ String rowKey;
+ for (Result r : ss) {
+ rowKey = new String(r.getRow());
+ if (Long.parseLong(rowKey) < controlPoint) {
+ if (LOG.isDebugEnabled()) LOG.debug("Adding (" + rowKey + ") to delete list");
+ Delete del = new Delete(rowKey.getBytes());
+ deleteList.add(del);
+ }
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("attempting to delete list with " + deleteList.size() + " elements");
+ table.delete(deleteList);
+ }
+ catch (Exception e) {
+ LOG.error("deleteAgedRecords IOException");
+ }finally {
+ ss.close();
+ }
+
+ if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords - exit");
+ return true;
+ }
+}
+