You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2016/05/18 18:05:47 UTC
[1/4] incubator-trafodion git commit: [TRAFODION-1988] Better Java
exception handling in Trafodion
Repository: incubator-trafodion
Updated Branches:
refs/heads/master 71edb095a -> 33bea22b1
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java b/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
index fa33ab1..4e4af19 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
@@ -520,7 +520,6 @@ public class HBaseClient {
metaColDesc.setInMemory(true);
desc.addFamily(metaColDesc);
HBaseAdmin admin = new HBaseAdmin(config);
- try {
if (beginEndKeys != null && beginEndKeys.length > 0)
{
byte[][] keys = new byte[beginEndKeys.length][];
@@ -542,12 +541,6 @@ public class HBaseClient {
admin.createTable(desc);
}
}
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("HbaseClient.createk : createTable error" + e);
- throw e;
- }
admin.close();
return true;
}
@@ -555,15 +548,9 @@ public class HBaseClient {
public boolean registerTruncateOnAbort(String tblName, long transID)
throws MasterNotRunningException, IOException {
- try {
if(transID != 0) {
table.truncateTableOnAbort(tblName, transID);
}
- }
- catch (IOException e) {
- if (logger.isDebugEnabled()) logger.debug("HbaseClient.registerTruncateOnAbort error" + e);
- throw e;
- }
return true;
}
@@ -637,7 +624,6 @@ public class HBaseClient {
setDescriptors(tableOptions,htblDesc /*out*/,colDesc /*out*/, defaultVersionsValue);
}
- try {
if (transID != 0) {
// Transactional alter support
table.alter(tblName, tableOptions, transID);
@@ -657,12 +643,6 @@ public class HBaseClient {
}
admin.close();
}
- }
- catch (IOException e) {
- if (logger.isDebugEnabled()) logger.debug("HbaseClient.drop error" + e);
- throw e;
- }
-
cleanupCache(tblName);
return true;
}
@@ -671,26 +651,20 @@ public class HBaseClient {
throws MasterNotRunningException, IOException {
if (logger.isDebugEnabled()) logger.debug("HBaseClient.drop(" + tblName + ") called.");
HBaseAdmin admin = new HBaseAdmin(config);
- // admin.disableTableAsync(tblName);
-
try {
if(transID != 0) {
table.dropTable(tblName, transID);
}
else {
- if (! admin.isTableEnabled(tblName))
- admin.enableTable(tblName);
- admin.disableTable(tblName);
+ if (admin.isTableEnabled(tblName))
+ admin.disableTable(tblName);
admin.deleteTable(tblName);
- admin.close();
}
+ cleanupCache(tblName);
+ } finally {
+ admin.close();
}
- catch (IOException e) {
- if (logger.isDebugEnabled()) logger.debug("HbaseClient.drop error" + e);
- throw e;
- }
-
- return cleanupCache(tblName);
+ return true;
}
public boolean dropAll(String pattern, long transID)
@@ -702,6 +676,7 @@ public class HBaseClient {
if (htdl == null) // no tables match the given pattern.
return true;
+ IOException ioExc = null;
for (HTableDescriptor htd : htdl) {
String tblName = htd.getNameAsString();
@@ -709,7 +684,6 @@ public class HBaseClient {
int idx = tblName.indexOf("TRAFODION._DTM_");
if (idx == 0)
continue;
-
try {
if(transID != 0) {
// System.out.println("tblName " + tblName);
@@ -725,15 +699,20 @@ public class HBaseClient {
}
catch (IOException e) {
+ if (ioExc == null) {
+ ioExc = new IOException("Not all tables are dropped, For details get suppressed exceptions");
+ ioExc.addSuppressed(e);
+ }
+ else
+ ioExc.addSuppressed(e);
if (logger.isDebugEnabled()) logger.debug("HbaseClient.dropAll error" + e);
- throw e;
}
-
cleanupCache(tblName);
}
admin.close();
- // return cleanup();
+ if (ioExc != null)
+ throw ioExc;
return true;
}
@@ -742,7 +721,6 @@ public class HBaseClient {
if (logger.isDebugEnabled()) logger.debug("HBaseClient.listAll(" + pattern + ") called.");
HBaseAdmin admin = new HBaseAdmin(config);
-
HTableDescriptor[] htdl =
(pattern.isEmpty() ? admin.listTables() : admin.listTables(pattern));
byte[][] hbaseTables = new byte[htdl.length][];
@@ -1492,17 +1470,8 @@ public class HBaseClient {
{
if (logger.isDebugEnabled()) logger.debug("HBaseClient.getHBulkLoadClient() called.");
HBulkLoadClient hblc = null;
- try
- {
- hblc = new HBulkLoadClient( config);
-
- if (hblc == null)
- throw new IOException ("hbkc is null");
- }
- catch (IOException e)
- {
- return null;
- }
+
+ hblc = new HBulkLoadClient( config);
return hblc;
@@ -1539,21 +1508,14 @@ public class HBaseClient {
admin = null;
return latestsnpName;
}
- public boolean cleanSnpScanTmpLocation(String pathStr) throws Exception
+ public boolean cleanSnpScanTmpLocation(String pathStr) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("HbaseClient.cleanSnpScanTmpLocation() - start - Path: " + pathStr);
- try
- {
+
Path delPath = new Path(pathStr );
delPath = delPath.makeQualified(delPath.toUri(), null);
FileSystem fs = FileSystem.get(delPath.toUri(),config);
fs.delete(delPath, true);
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("HbaseClient.cleanSnpScanTmpLocation() --exception:" + e);
- throw e;
- }
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java b/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
index 31d8cac..0635ace 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
@@ -183,11 +183,6 @@ public class HBulkLoadClient
{
if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doCreateHFile() called.");
- if (hFileLocation == null )
- throw new NullPointerException(hFileLocation + " is not set");
- if (hFileName == null )
- throw new NullPointerException(hFileName + " is not set");
-
closeHFile();
if (fileSys == null)
@@ -198,26 +193,18 @@ public class HBulkLoadClient
if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.createHFile Path: " + hfilePath);
- try
- {
- HFileContext hfileContext = new HFileContextBuilder()
+ HFileContext hfileContext = new HFileContextBuilder()
.withBlockSize(blockSize)
.withCompression(Compression.getCompressionAlgorithmByName(compression))
.withDataBlockEncoding(dataBlockEncoding)
.build();
- writer = HFile.getWriterFactory(config, new CacheConfig(config))
+ writer = HFile.getWriterFactory(config, new CacheConfig(config))
.withPath(fileSys, hfilePath)
.withFileContext(hfileContext)
.withComparator(KeyValue.COMPARATOR)
.create();
- if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.createHFile Path: " + writer.getPath() + "Created");
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doCreateHFile Exception" + e.getMessage());
- throw e;
- }
+ if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.createHFile Path: " + writer.getPath() + "Created");
return true;
}
@@ -317,12 +304,6 @@ public class HBulkLoadClient
}
}
admin.snapshot(snapshotName, tableName);
- }
- catch (Exception e)
- {
- //log exeception and throw the exception again to teh parent
- if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.createSnapshot() - Exception: " + e);
- throw e;
}
finally
{
@@ -334,7 +315,7 @@ public class HBulkLoadClient
}
private boolean restoreSnapshot( String snapshotName, String tableName)
- throws IOException, RestoreSnapshotException, Exception
+ throws IOException, RestoreSnapshotException
{
HBaseAdmin admin = null;
try
@@ -347,19 +328,12 @@ public class HBulkLoadClient
admin.enableTable(tableName);
}
- catch (Exception e)
- {
- //log exeception and throw the exception again to the parent
- if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.restoreSnapshot() - Exception: " + e);
- throw e;
- }
finally
{
//close HBaseAdmin instance
if (admin != null)
admin.close();
}
-
return true;
}
private boolean deleteSnapshot( String snapshotName, String tableName)
@@ -391,12 +365,6 @@ public class HBulkLoadClient
admin.enableTable(tableName);
admin.deleteSnapshot(snapshotName);
}
- catch (Exception e)
- {
- //log exeception and throw the exception again to the parent
- if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.restoreSnapshot() - Exception: " + e);
- throw e;
- }
finally
{
//close HBaseAdmin instance
@@ -464,7 +432,7 @@ public class HBulkLoadClient
if (quasiSecure)
{
- throw new Exception("HBulkLoadClient.doBulkLoad() - cannot perform load. Trafodion on secure HBase mode is not implemented yet");
+ throw new UnsupportedOperationException("HBulkLoadClient.doBulkLoad() - cannot perform load. Trafodion on secure HBase mode is not implemented yet");
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
index 2b7ad66..9edb4db 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
@@ -161,21 +161,25 @@ public class HTableClient {
return null;
return snpDesc.getName();
}
+
void setSnapRestorePath() throws IOException
{
String restoreDirStr = tmpLocation + getSnapshotDescription().getName(); ;
snapRestorePath = new Path(restoreDirStr);
snapRestorePath = snapRestorePath.makeQualified(fs.getUri(), snapRestorePath);
}
+
Path getSnapRestorePath() throws IOException
{
return snapRestorePath;
}
+
boolean snapshotExists() throws IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.snapshotExists() called. ");
return !admin.listSnapshots(snpDesc.getName()).isEmpty();
}
+
void deleteSnapshot() throws IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.deleteSnapshot() called. ");
@@ -203,20 +207,23 @@ public class HTableClient {
}
}
- void createTableSnapshotScanner(int timeout, int slp, long nbre, Scan scan) throws InterruptedException
+ void createTableSnapshotScanner(int timeout, int slp, long nbre, Scan scan) throws InterruptedException, IOException
{
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.createTableSnapshotScanner() called. ");
int xx=0;
+ IOException ioExc = null;
while (xx < timeout)
{
- xx++;
+ xx++;
scanner = null;
try
{
+ ioExc = null;
scanner = new TableSnapshotScanner(table.getConfiguration(), snapHelper.getSnapRestorePath(), snapHelper.getSnapshotName(), scan);
}
catch(IOException e )
{
+ ioExc = e;
if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.createTableSnapshotScanner(). espNumber: " + nbre +
" snapshot " + snpDesc.getName() + " TableSnapshotScanner Exception :" + e);
Thread.sleep(slp);
@@ -226,7 +233,10 @@ public class HTableClient {
nbre + " snapshot " + snpDesc.getName() + " TableSnapshotScanner Done - Scanner:" + scanner );
break;
}
+ if (ioExc != null)
+ throw ioExc;
}
+
void setSnapshotDescription( String snapName)
{
if (snapName == null )
@@ -994,11 +1004,9 @@ public class HTableClient {
" snapshot name: " + snapHelper.getSnapshotName());
if (!snapHelper.snapshotExists())
- throw new Exception ("Snapshot " + snapHelper.getSnapshotName() + " does not exist.");
+ throw new IOException ("Snapshot " + snapHelper.getSnapshotName() + " does not exist.");
snapHelper.createTableSnapshotScanner(snapTimeout, 5, espNum, scan);
- if (scanner==null)
- throw new Exception("Cannot create Table Snapshot Scanner");
}
if (useSnapshotScan)
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/HiveClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HiveClient.java b/core/sql/src/main/java/org/trafodion/sql/HiveClient.java
index 3ab6ef8..b6471a6 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HiveClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HiveClient.java
@@ -174,7 +174,8 @@ public class HiveClient {
// Because Hive changed the name of the class containing internal constants changed
// in Hive 0.10, we are using Java Reflection to get the value of the DDL_TIME constant.
public static String getDDLTimeConstant()
- throws MetaException {
+ throws MetaException
+ {
Class constsClass = null;
Object constsFromReflection = null;
@@ -208,17 +209,16 @@ public class HiveClient {
} catch (ClassNotFoundException e) {
throw new MetaException("Could not find Hive Metastore constants class");
}
-
// Using Java reflection, get a reference to the DDL_TIME field
- try {
+ try {
ddlTimeField = constsClass.getField("DDL_TIME");
} catch (NoSuchFieldException e) {
throw new MetaException("Could not find DDL_TIME constant field");
}
-
// get the String object that represents the value of this field
- try {
+ try {
fieldVal = ddlTimeField.get(constsFromReflection);
+
} catch (IllegalAccessException e) {
throw new MetaException("Could not get value for DDL_TIME constant field");
}
@@ -240,20 +240,12 @@ public class HiveClient {
return true;
}
- boolean hdfsWrite(byte[] buff, long len) throws Exception
+ boolean hdfsWrite(byte[] buff, long len) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("HiveClient.hdfsWrite() - started" );
- try
- {
- fsOut.write(buff);
- fsOut.flush();
- }
- catch (Exception e)
- {
- if (logger.isDebugEnabled()) logger.debug("HiveClient.hdfsWrite() -- exception: " + e);
- throw e;
- }
+ fsOut.write(buff);
+ fsOut.flush();
if (logger.isDebugEnabled()) logger.debug("HiveClient.hdfsWrite() - bytes written and flushed:" + len );
return true;
@@ -262,40 +254,15 @@ public class HiveClient {
boolean hdfsClose() throws IOException
{
if (logger.isDebugEnabled()) logger.debug("HiveClient.hdfsClose() - started" );
- try
- {
- fsOut.close();
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("HiveClient.hdfsClose() - exception:" + e);
- throw e;
- }
+ fsOut.close();
return true;
}
public void executeHiveSQL(String ddl) throws ClassNotFoundException, SQLException
{
- try
- {
- Class.forName("org.apache.hive.jdbc.HiveDriver");
- }
-
- catch(ClassNotFoundException e)
- {
- throw e;
- }
-
- try
- {
- Connection con = DriverManager.getConnection("jdbc:hive2://", "hive", "");
- Statement stmt = con.createStatement();
- stmt.execute(ddl);
- }
-
- catch(SQLException e)
- {
- throw e;
- }
+ Class.forName("org.apache.hive.jdbc.HiveDriver");
+ Connection con = DriverManager.getConnection("jdbc:hive2://", "hive", "");
+ Statement stmt = con.createStatement();
+ stmt.execute(ddl);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/ResultIterator.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/ResultIterator.java b/core/sql/src/main/java/org/trafodion/sql/ResultIterator.java
deleted file mode 100644
index fccda24..0000000
--- a/core/sql/src/main/java/org/trafodion/sql/ResultIterator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.sql;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-
-
-public class ResultIterator {
- ResultScanner scanner;
- Result[] resultSet;
- Result row = null;
- scanFetchStep step;
- List<KeyValue> kvList;
- int listIndex = 0;
- int cellIndex;
- int numKVs;
- boolean isSingleRow = false;
-
- private enum scanFetchStep {
- SCAN_FETCH_NEXT_ROW,
- SCAN_FETCH_NEXT_COL,
- SCAN_FETCH_CLOSE
- } ;
-
- public ResultIterator(ResultScanner scanner) {
- this.scanner = scanner;
- resultSet = null;
- step = scanFetchStep.SCAN_FETCH_NEXT_ROW;
- }
-
- public ResultIterator(Result[] results) {
- this.scanner = null;
- resultSet = results;
- step = scanFetchStep.SCAN_FETCH_NEXT_ROW;
- }
-
- public ResultIterator(Result result) {
- this.scanner = null;
- resultSet = null;
- row = result;
- isSingleRow = true;
- step = scanFetchStep.SCAN_FETCH_NEXT_ROW;
- }
-
- KeyValue nextCell() throws IOException {
- while (true)
- {
- switch (step)
- {
- case SCAN_FETCH_NEXT_ROW:
- {
- if (isSingleRow == false) {
- if (scanner != null)
- row = scanner.next();
- else {
- if (listIndex == resultSet.length) {
- step = scanFetchStep.SCAN_FETCH_CLOSE;
- break;
- }
- row = resultSet[listIndex];
- listIndex++;
- }
- }
-
- if (row == null || row.isEmpty()) {
- step = scanFetchStep.SCAN_FETCH_CLOSE;
- break;
- }
-
- kvList = row.list();
- cellIndex = 0;
- numKVs = kvList.size();
-
- step = scanFetchStep.SCAN_FETCH_NEXT_COL;
- }
- break;
-
- case SCAN_FETCH_NEXT_COL:
- {
- KeyValue kv = kvList.get(cellIndex);
- cellIndex++;
- if (kv == null) {
- if (isSingleRow)
- step = scanFetchStep.SCAN_FETCH_CLOSE;
- else
- step = scanFetchStep.SCAN_FETCH_NEXT_ROW;
- break;
- }
-
- if (cellIndex == numKVs)
- if (isSingleRow)
- step = scanFetchStep.SCAN_FETCH_CLOSE;
- else
- step = scanFetchStep.SCAN_FETCH_NEXT_ROW;
-
- return kv;
- }
-
- case SCAN_FETCH_CLOSE:
- {
- return null;
- }
-
- }// switch
- } // while
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/ResultKeyValueList.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/ResultKeyValueList.java b/core/sql/src/main/java/org/trafodion/sql/ResultKeyValueList.java
deleted file mode 100644
index 6332070..0000000
--- a/core/sql/src/main/java/org/trafodion/sql/ResultKeyValueList.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.sql;
-
-import java.util.List;
-import java.io.*;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Result;
-import java.nio.*;
-
-public class ResultKeyValueList {
- Result result;
- List<KeyValue> kvList;
-
- public ResultKeyValueList(Result result) {
- super();
- this.result = result;
- kvList = result.list();
- }
-
- byte[] getRowID() {
- if (result == null)
- return null;
- else
- return result.getRow();
- }
-
- byte[] getAllKeyValues() {
- if (kvList == null)
- return null;
- int numCols = kvList.size();
- byte[] rowID = result.getRow();
- int bufSize = rowID.length;
- bufSize += (64 * numCols);
- for (int i=0; i<numCols; i++) {
- bufSize += kvList.get(i).getLength();
- }
- ByteBuffer buf = ByteBuffer.allocate(bufSize);
- buf.order(ByteOrder.LITTLE_ENDIAN);
- // move in numCols
- buf.putInt(numCols);
- // move in rowID length and rowID
- buf.putInt(rowID.length);
- buf.put(rowID);;
- // move in all descriptors
- for (int i=0; i<numCols; i++) {
- copyKVs(buf, kvList.get(i));
- }
- return buf.array();
- }
-
- void copyKVs(ByteBuffer buf, KeyValue kv)
- {
- buf.putInt(kv.getLength());
- int offset = kv.getOffset();
- buf.putInt(kv.getValueLength());
- buf.putInt(kv.getValueOffset() - offset);
- buf.putInt(kv.getQualifierLength());
- buf.putInt(kv.getQualifierOffset() - offset);
- buf.putInt(kv.getFamilyLength());
- buf.putInt(kv.getFamilyOffset() - offset);
- buf.putLong(kv.getTimestamp());
- buf.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
- }
-
-
- int getSize() {
- if (kvList == null)
- return 0;
- else
- return kvList.size();
- }
-
- KeyValue getEntry(int i) {
- if (kvList == null)
- return null;
- else
- return kvList.get(i);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/RowToInsert.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/RowToInsert.java b/core/sql/src/main/java/org/trafodion/sql/RowToInsert.java
deleted file mode 100644
index 7bb53c3..0000000
--- a/core/sql/src/main/java/org/trafodion/sql/RowToInsert.java
+++ /dev/null
@@ -1,44 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.sql;
-
-import java.util.Vector;
-
-public class RowToInsert extends Vector<RowToInsert.ColToInsert> {
-
- public class ColToInsert {
- public byte[] qualName;
- public byte[] colValue;
- }
-
- private static final long serialVersionUID = 5066470006717527862L;
-
- public void addColumn(byte[] name, byte[] value) {
- ColToInsert col = new ColToInsert();
- col.qualName = name;
- col.colValue = value;
- add(col);
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/RowsToInsert.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/RowsToInsert.java b/core/sql/src/main/java/org/trafodion/sql/RowsToInsert.java
deleted file mode 100644
index 594fe61..0000000
--- a/core/sql/src/main/java/org/trafodion/sql/RowsToInsert.java
+++ /dev/null
@@ -1,57 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.sql;
-
-import java.util.Vector;
-
-public class RowsToInsert extends Vector<RowsToInsert.RowInfo> {
-
- public class RowInfo {
- public byte[] rowId;
- public Vector<RowsToInsert.ColToInsert> columns;
- }
-
- public class ColToInsert {
- public byte[] qualName;
- public byte[] colValue;
- }
-
- private static final long serialVersionUID = 5066470006717527863L;
-
- public void addRowId(byte[] rowId) {
- RowInfo rowInfo = new RowInfo();
- rowInfo.rowId = rowId;
- rowInfo.columns = new Vector<RowsToInsert.ColToInsert>();
- rowInfo.columns.clear();
- add(rowInfo);
- }
-
- public void addColumn(byte[] name, byte[] value) {
- ColToInsert col = new ColToInsert();
- col.qualName = name;
- col.colValue = value;
- if (size() > 0)
- get(size()-1).columns.add(col);
- // RowInfo.columns.add(col);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java b/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
index 0950431..a47c2f4 100644
--- a/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
+++ b/core/sql/src/main/java/org/trafodion/sql/SequenceFileWriter.java
@@ -191,69 +191,38 @@ public class SequenceFileWriter {
{
GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, conf);
Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
- try
- {
- outStream = gzipCodec.createOutputStream(fsOut, gzipCompressor);
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() --exception :" + e);
- throw e;
- }
+ outStream = gzipCodec.createOutputStream(fsOut, gzipCompressor);
}
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCreate() - compressed output stream created" );
return true;
}
- boolean hdfsWrite(byte[] buff, long len) throws Exception,OutOfMemoryError
+ boolean hdfsWrite(byte[] buff, long len) throws IOException
+ //,OutOfMemoryError
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - started" );
- try
- {
- outStream.write(buff);
- outStream.flush();
- }
- catch (Exception e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() -- exception: " + e);
- throw e;
- }
- catch (OutOfMemoryError e1)
- {
- logger.debug("SequenceFileWriter.hdfsWrite() -- OutOfMemory Error: " + e1);
- throw e1;
- }
+ outStream.write(buff);
+ outStream.flush();
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsWrite() - bytes written and flushed:" + len );
-
return true;
}
boolean hdfsClose() throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsClose() - started" );
- try
- {
- outStream.close();
- fsOut.close();
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsClose() - exception:" + e);
- throw e;
- }
+ outStream.close();
+ fsOut.close();
return true;
}
- public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws Exception
+ public boolean hdfsMergeFiles(String srcPathStr, String dstPathStr) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - start");
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - source Path: " + srcPathStr +
", destination File:" + dstPathStr );
- try
- {
Path srcPath = new Path(srcPathStr );
srcPath = srcPath.makeQualified(srcPath.toUri(), null);
FileSystem srcFs = FileSystem.get(srcPath.toUri(),conf);
@@ -286,24 +255,15 @@ public class SequenceFileWriter {
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() - delete intermediate files" );
srcFs.delete(tmpSrcPath, true);
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsMergeFiles() --exception:" + e);
- throw e;
- }
-
-
return true;
}
+
public boolean hdfsCleanUnloadPath(String uldPathStr
- /*, boolean checkExistence, String mergeFileStr*/) throws Exception
+ /*, boolean checkExistence, String mergeFileStr*/) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - start");
logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() - unload Path: " + uldPathStr );
- try
- {
Path uldPath = new Path(uldPathStr );
uldPath = uldPath.makeQualified(uldPath.toUri(), null);
FileSystem srcFs = FileSystem.get(uldPath.toUri(),conf);
@@ -320,23 +280,14 @@ public class SequenceFileWriter {
for (Path f : files){
srcFs.delete(f, false);
}
- }
- catch (IOException e)
- {
- logger.debug("SequenceFileWriter.hdfsCleanUnloadPath() -exception:" + e);
- throw e;
- }
-
return true;
}
- public boolean hdfsExists(String filePathStr) throws Exception
+ public boolean hdfsExists(String filePathStr) throws IOException
{
logger.debug("SequenceFileWriter.hdfsExists() - start");
logger.debug("SequenceFileWriter.hdfsExists() - Path: " + filePathStr);
- try
- {
//check existence of the merge Path
Path filePath = new Path(filePathStr );
filePath = filePath.makeQualified(filePath.toUri(), null);
@@ -347,72 +298,43 @@ public class SequenceFileWriter {
+ filePath + " exists" );
return true;
}
-
- } catch (IOException e) {
- logger.debug("SequenceFileWriter.hdfsExists() -exception:" + e);
- throw e;
- }
return false;
}
- public boolean hdfsDeletePath(String pathStr) throws Exception
+ public boolean hdfsDeletePath(String pathStr) throws IOException
{
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsDeletePath() - start - Path: " + pathStr);
- try
- {
Path delPath = new Path(pathStr );
delPath = delPath.makeQualified(delPath.toUri(), null);
FileSystem fs = FileSystem.get(delPath.toUri(),conf);
fs.delete(delPath, true);
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.hdfsDeletePath() --exception:" + e);
- throw e;
- }
-
return true;
}
- private boolean init(String zkServers, String zkPort)
- throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException
+ private boolean init(String zkServers, String zkPort) throws IOException
+ , ServiceException
{
logger.debug("SequenceFileWriter.init(" + zkServers + ", " + zkPort + ") called.");
if (conf != null)
return true;
conf = HBaseConfiguration.create();
- if (zkServers.length() > 0)
- conf.set("hbase.zookeeper.quorum", zkServers);
- if (zkPort.length() > 0)
- conf.set("hbase.zookeeper.property.clientPort", zkPort);
HBaseAdmin.checkHBaseAvailable(conf);
return true;
}
public boolean createSnapshot( String tableName, String snapshotName)
- throws MasterNotRunningException, IOException, SnapshotCreationException,
- InterruptedException, ZooKeeperConnectionException, ServiceException, Exception
+ throws IOException
{
- try
- {
if (admin == null)
admin = new HBaseAdmin(conf);
admin.snapshot(snapshotName, tableName);
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.createSnapshot() - Snapshot created: " + snapshotName);
- }
- catch (Exception e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.createSnapshot() - Exception: " + e);
- throw e;
- }
return true;
}
+
public boolean verifySnapshot( String tableName, String snapshotName)
- throws MasterNotRunningException, IOException, SnapshotCreationException,
- InterruptedException, ZooKeeperConnectionException, ServiceException, Exception
+ throws IOException
{
- try
- {
if (admin == null)
admin = new HBaseAdmin(conf);
List<SnapshotDescription> lstSnaps = admin.listSnapshots();
@@ -426,12 +348,6 @@ public class SequenceFileWriter {
return true;
}
}
- }
- catch (Exception e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.verifySnapshot() - Exception: " + e);
- throw e;
- }
return false;
}
@@ -439,20 +355,11 @@ public class SequenceFileWriter {
throws MasterNotRunningException, IOException, SnapshotCreationException,
InterruptedException, ZooKeeperConnectionException, ServiceException, Exception
{
- try
- {
if (admin == null)
admin = new HBaseAdmin(conf);
admin.deleteSnapshot(snapshotName);
if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.deleteSnapshot() - Snapshot deleted: " + snapshotName);
- }
- catch (Exception e)
- {
- if (logger.isDebugEnabled()) logger.debug("SequenceFileWriter.deleteSnapshot() - Exception: " + e);
- throw e;
- }
-
- return true;
+ return true;
}
public boolean release() throws IOException
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/StringArrayList.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/StringArrayList.java b/core/sql/src/main/java/org/trafodion/sql/StringArrayList.java
deleted file mode 100644
index 6a2672c..0000000
--- a/core/sql/src/main/java/org/trafodion/sql/StringArrayList.java
+++ /dev/null
@@ -1,47 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.sql;
-
-import java.util.ArrayList;
-
-public class StringArrayList extends ArrayList<String> {
-
- private static final long serialVersionUID = -3557219338406352735L;
-
- void addElement(String st) {
- add(st);
- }
-
- String getElement(int i) {
- if (size() == 0)
- return null;
- else if (i < size())
- return get(i);
- else
- return null;
- }
-
- int getSize() {
- return size();
- }
-
-}
[2/4] incubator-trafodion git commit: [TRAFODION-1988] Better Java
exception handling in Trafodion
Posted by db...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 3904253..2c7e6af 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.HashMap;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -128,6 +130,7 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionPro
import org.apache.hadoop.hbase.coprocessor.transactional.generated.SsccRegionProtos.SsccPutTransactionalRequest;
import org.apache.hadoop.hbase.client.transactional.SsccUpdateConflictException;
+import com.google.protobuf.ServiceException;
/**
* Transaction Manager. Responsible for committing transactions.
*/
@@ -232,20 +235,13 @@ public class TransactionManager {
public void init(final TmDDL tmddl) throws IOException {
this.config = HBaseConfiguration.create();
this.tmDDL = tmddl;
- try {
hbadmin = new HBaseAdmin(config);
- }
- catch(Exception e) {
- System.out.println("ERROR: Unable to obtain HBase accessors, Exiting " + e);
- e.printStackTrace();
- System.exit(1);
- }
}
/**
* TransactionManagerCallable : inner class for creating asynchronous requests
*/
- private abstract class TransactionManagerCallable implements Callable<Integer>{
+ private abstract class TransactionManagerCallable implements Callable<Integer> {
TransactionState transactionState;
TransactionRegionLocation location;
HTable table;
@@ -253,16 +249,11 @@ public class TransactionManager {
byte[] endKey_orig;
byte[] endKey;
- TransactionManagerCallable(TransactionState txState, TransactionRegionLocation location, HConnection connection) {
+ TransactionManagerCallable(TransactionState txState, TransactionRegionLocation location, HConnection connection)
+ throws IOException {
transactionState = txState;
this.location = location;
- try {
table = new HTable(location.getRegionInfo().getTable(), connection, cp_tpe);
- } catch(IOException e) {
- e.printStackTrace();
- LOG.error("Error obtaining HTable instance " + e);
- table = null;
- }
startKey = location.getRegionInfo().getStartKey();
endKey_orig = location.getRegionInfo().getEndKey();
if(endKey_orig == null || endKey_orig == HConstants.EMPTY_END_ROW)
@@ -321,11 +312,16 @@ public class TransactionManager {
" ignoreUnknownTransaction: " + ignoreUnknownTransaction + " table: " +
table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
+ } catch (ServiceException se) {
+ String msg = new String ("ERROR occurred while calling doCommitX coprocessor service in doCommitX for transaction: "
+ + transactionId + " participantNum " + participantNum );
+ LOG.error(msg, se);
+ throw new RetryTransactionException(msg,se);
} catch (Throwable e) {
String msg = new String ("ERROR occurred while calling doCommitX coprocessor service in doCommitX for transaction: "
- + transactionId + " participantNum " + participantNum + " Exception: " + e);
- LOG.error(msg);
- throw new Exception(msg);
+ + transactionId + " participantNum " + participantNum );
+ LOG.error(msg, e);
+ throw new DoNotRetryIOException(msg,e);
}
if(result.size() == 0) {
if(LOG.isTraceEnabled()) LOG.trace("doCommitX,received incorrect result size: " + result.size() + " txid: "
@@ -389,12 +385,12 @@ public class TransactionManager {
}
else {
LOG.error("doCommitX, coprocessor UnknownTransactionException: " + cresponse.getException());
- throw new UnknownTransactionException();
+ throw new UnknownTransactionException(cresponse.getException());
}
}
else {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX coprocessor exception: " + cresponse.getException());
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
}
@@ -411,7 +407,7 @@ public class TransactionManager {
}
else {
LOG.error("doCommitX, coprocessor UnknownTransactionException: " + cresponse.getException());
- throw new UnknownTransactionException();
+ throw new UnknownTransactionException(cresponse.getException());
}
}
else if(exceptionString.contains("Asked to commit a non-pending transaction")) {
@@ -419,7 +415,7 @@ public class TransactionManager {
}
else {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX coprocessor exception: " + cresponse.getException());
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
}
@@ -429,22 +425,23 @@ public class TransactionManager {
}
catch (UnknownTransactionException ute) {
LOG.error("Got unknown exception in doCommitX by participant " + participantNum
- + " for transaction: " + transactionId + " " + ute);
- transactionState.requestPendingCountDec(true);
- throw new UnknownTransactionException();
+ + " for transaction: " + transactionId, ute);
+ transactionState.requestPendingCountDec(true);
+ throw ute;
}
- catch (Exception e) {
- if(e.toString().contains("Asked to commit a non-pending transaction")) {
- LOG.error("doCommitX transaction: "
- + transactionId + " will not retry: " + e);
- refresh = false;
- retry = false;
- }
- else {
- LOG.error("doCommitX retrying transaction: " + transactionId + " due to Exception: " + e);
- refresh = true;
- retry = true;
+ catch (RetryTransactionException rte) {
+ if(retryCount == RETRY_ATTEMPTS) {
+ LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
+ // We have received our reply in the form of an exception,
+ // so decrement outstanding count and wake up waiters to avoid
+ // getting hung forever
+ transactionState.requestPendingCountDec(true);
+ throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " +
+ transactionId, rte);
}
+ LOG.error("doCommitX retrying transaction: " + transactionId + " due to Exception: ", rte);
+ refresh = true;
+ retry = true;
}
if (refresh) {
@@ -453,15 +450,6 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + " endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS) {
- LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
- // We have received our reply in the form of an exception,
- // so decrement outstanding count and wake up waiters to avoid
- // getting hung forever
- transactionState.requestPendingCountDec(true);
- throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
- }
-
if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- " + table.toString() + " location being refreshed");
if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- lv_hri: " + lv_hri);
if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo());
@@ -472,7 +460,7 @@ public class TransactionManager {
retryCount++;
- if (retryCount < RETRY_ATTEMPTS && retry == true) {
+ if (retryCount < RETRY_ATTEMPTS && retry == true) {
try {
Thread.sleep(retrySleep);
} catch(InterruptedException ex) {
@@ -516,10 +504,14 @@ public class TransactionManager {
" ignoreUnknownTransaction: " + ignoreUnknownTransaction + " table: " +
table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
result = table.coprocessorService(SsccRegionService.class, startKey, endKey, callable);
+ } catch (ServiceException se) {
+ String msg = "ERROR occurred while calling doCommitX coprocessor service in doCommitX";
+ LOG.error(msg + ":", se);
+ throw new DoNotRetryIOException(msg, se);
} catch (Throwable e) {
String msg = "ERROR occurred while calling doCommitX coprocessor service in doCommitX";
- LOG.error(msg + ":" + e);
- throw new Exception(msg);
+ LOG.error(msg + ":", e);
+ throw new RetryTransactionException(msg, e);
}
if(result.size() != 1) {
LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + transactionId);
@@ -537,29 +529,38 @@ public class TransactionManager {
}
else {
LOG.error("doCommitX, coprocessor UnknownTransactionException: " + cresponse.getException());
- throw new UnknownTransactionException();
+ throw new UnknownTransactionException(cresponse.getException());
}
}
else {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX coprocessor exception: " + cresponse.getException());
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
}
retry = false;
}
}
- catch (Exception e) {
- if(e instanceof UnknownTransactionException) {
- String errMsg = new String("Got unknown exception in doCommitX by participant " + participantNum
- + " for transaction: " + transactionId + " " + e);
- LOG.error(errMsg);
+ catch (UnknownTransactionException ute) {
+ String errMsg = new String("Got unknown exception in doCommitX by participant " + participantNum
+ + " for transaction: " + transactionId);
+ LOG.error(errMsg, ute);
+ transactionState.requestPendingCountDec(true);
+ throw ute;
+ }
+ catch (RetryTransactionException rte) {
+ if(retryCount == RETRY_ATTEMPTS) {
+ LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, rte);
+ // We have received our reply in the form of an exception,
+ // so decrement outstanding count and wake up waiters to avoid
+ // getting hung forever
transactionState.requestPendingCountDec(true);
- throw new UnknownTransactionException(errMsg);
+ throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId,
+ rte);
}
LOG.error("doCommitX participant " + participantNum + " retrying transaction "
- + transactionId + " due to Exception: " + e);
+ + transactionId + " due to Exception: " , rte);
refresh = true;
retry = true;
}
@@ -570,22 +571,10 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS) {
- LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
- // We have received our reply in the form of an exception,
- // so decrement outstanding count and wake up waiters to avoid
- // getting hung forever
- transactionState.requestPendingCountDec(true);
- throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
- }
-
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- " + table.toString() + " location being refreshed");
if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- lv_hri: " + lv_hri);
if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo());
table.getRegionLocation(startKey, true);
-// }
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- setting retry, count: " + retryCount);
refresh = false;
}
@@ -607,11 +596,6 @@ public class TransactionManager {
// We have received our reply so decrement outstanding count
transactionState.requestPendingCountDec(false);
- // forget the transaction if all replies have been received. otherwise another thread
- // will do it.
-// if (transactionState.requestAllComplete()){
-
-// }
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- EXIT txid: " + transactionId);
return 0;
}
@@ -661,9 +645,14 @@ public class TransactionManager {
try {
result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
+ } catch (ServiceException se) {
+ String errMsg = new String("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":");
+ LOG.error(errMsg, se);
+ throw new RetryTransactionException("Unable to call prepare, coprocessor error", se);
} catch (Throwable e) {
- LOG.error("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":" + e);
- throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error");
+ String errMsg = new String("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":");
+ LOG.error(errMsg, e);
+ throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error", e);
}
if(result.size() == 0) {
@@ -685,7 +674,7 @@ public class TransactionManager {
}
else {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " + cresponse.getException());
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
if(value == TransactionalReturn.COMMIT_RESEND) {
@@ -705,8 +694,12 @@ public class TransactionManager {
retry = false;
}
else {
- // Pause for split to complete and retry
- Thread.sleep(100);
+ try {
+ // Pause for split to complete and retry
+ Thread.sleep(100);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
retry = true;
}
}
@@ -731,7 +724,7 @@ public class TransactionManager {
else {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " +
cresponse.getException());
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
}
@@ -745,17 +738,17 @@ public class TransactionManager {
retry = false;
}
}
- catch(Exception e) {
- String exceptionString = e.toString();
- if(e instanceof UnknownTransactionException) {
- String errMsg = new String("doPrepareX participant " + participantNum + " transaction "
- + transactionId + " unknown transaction : " + e);
- LOG.warn(errMsg);
+ catch(RetryTransactionException rte) {
+ if (retryCount == RETRY_ATTEMPTS){
+ LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount, rte);
+ // We have received our reply in the form of an exception,
+ // so decrement outstanding count and wake up waiters to avoid
+ // getting hung forever
transactionState.requestPendingCountDec(true);
- throw new UnknownTransactionException(errMsg);
+ throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount, rte);
}
LOG.error("doPrepareX participant " + participantNum + " retrying transaction "
- + transactionId + " due to Exception: " + e);
+ + transactionId + " due to Exception: " , rte);
refresh = true;
retry = true;
}
@@ -766,23 +759,12 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS){
- LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount);
- // We have received our reply in the form of an exception,
- // so decrement outstanding count and wake up waiters to avoid
- // getting hung forever
- transactionState.requestPendingCountDec(true);
- throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount);
- }
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
if (LOG.isWarnEnabled()) LOG.warn("doPrepareX -- " + table.toString() + " location being refreshed");
if (LOG.isWarnEnabled()) LOG.warn("doPrepareX -- lv_hri: " + lv_hri);
if (LOG.isWarnEnabled()) LOG.warn("doPrepareX -- location.getRegionInfo(): " + location.getRegionInfo());
table.getRegionLocation(startKey, true);
LOG.debug("doPrepareX retry count: " + retryCount);
-// }
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- setting retry, count: " + retryCount);
refresh = false;
}
@@ -826,8 +808,8 @@ public class TransactionManager {
try {
result = table.coprocessorService(SsccRegionService.class, startKey, endKey, callable);
} catch (Throwable e) {
- LOG.error("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":" + e);
- throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error");
+ LOG.error("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId, e);
+ throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error", e);
}
if(result.size() != 1) {
@@ -843,23 +825,23 @@ public class TransactionManager {
commitStatus = value;
if(cresponse.getHasException()) {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " + cresponse.getException());
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
retry = false;
}
}
- catch(Exception e) {
- String exceptionString = e.toString();
- if(e instanceof UnknownTransactionException) {
- String errMsg = new String("doPrepareX participant " + participantNum + " transaction "
- + transactionId + " unknown transaction : " + e);
- LOG.warn(errMsg);
+ catch (RetryTransactionException e) {
+ if (retryCount == RETRY_ATTEMPTS) {
+ LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount, e);
+ // We have received our reply in the form of an exception,
+ // so decrement outstanding count and wake up waiters to avoid
+ // getting hung forever
transactionState.requestPendingCountDec(true);
- throw new UnknownTransactionException(errMsg);
+ throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount, e);
}
LOG.error("doPrepareX participant " + participantNum + " retrying transaction "
- + transactionId + " due to Exception: " + e);
+ + transactionId + " due to Exception: ", e);
refresh = true;
retry = true;
}
@@ -870,23 +852,12 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS){
- LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount);
- // We have received our reply in the form of an exception,
- // so decrement outstanding count and wake up waiters to avoid
- // getting hung forever
- transactionState.requestPendingCountDec(true);
- throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount);
- }
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
if (LOG.isWarnEnabled()) LOG.warn("doPrepareX -- " + table.toString() + " location being refreshed");
if (LOG.isWarnEnabled()) LOG.warn("doPrepareX -- lv_hri: " + lv_hri);
if (LOG.isWarnEnabled()) LOG.warn("doPrepareX -- location.getRegionInfo(): " + location.getRegionInfo());
table.getRegionLocation(startKey, true);
LOG.debug("doPrepareX retry count: " + retryCount);
-// }
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- setting retry, count: " + retryCount);
refresh = false;
}
@@ -993,11 +964,16 @@ public class TransactionManager {
+ transactionId + " table: " + table.toString() + " startKey: "
+ new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
- } catch (Throwable e) {
+ } catch (ServiceException se) {
String msg = "ERROR occurred while calling doAbortX coprocessor service";
- LOG.error(msg + ":" + e);
- throw new Exception(msg);
+ LOG.error(msg, se);
+ throw new RetryTransactionException(msg, se);
+ } catch (Throwable t) {
+ String msg = "ERROR occurred while calling doAbortX coprocessor service";
+ LOG.error(msg, t);
+ throw new DoNotRetryIOException(msg, t);
}
+
if(result.size() == 0) {
LOG.error("doAbortX, received 0 region results for transaction: " + transactionId
@@ -1009,54 +985,47 @@ public class TransactionManager {
for (AbortTransactionResponse cresponse : result.values()) {
if(cresponse.getHasException()) {
String exceptionString = new String (cresponse.getException());
- LOG.error("Abort of transaction: " + transactionId
+ String errMsg = new String("Abort of transaction: " + transactionId
+ " participantNum: " + participantNum + " region: " + Bytes.toString(regionName)
+ " threw Exception: " + exceptionString);
+ LOG.error(errMsg);
if(exceptionString.contains("UnknownTransactionException")) {
- throw new UnknownTransactionException();
+ throw new UnknownTransactionException(errMsg);
}
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
retry = false;
}
}
- catch (UnknownTransactionException ute) {
- LOG.error("UnknownTransactionException in doAbortX for transaction: " + transactionId
- + " participantNum: " + participantNum + " region: "
- + Bytes.toString(regionName) + "(ignoring): " + ute ); }
- catch (Exception e) {
- if(e.toString().contains("Asked to commit a non-pending transaction ")) {
- LOG.error(" doCommitX will not retry transaction: " + transactionId + " : " + e);
- refresh = false;
- retry = false;
- }
- else {
- LOG.error("doAbortX retrying transaction: " + transactionId + " participantNum: "
- + participantNum + " region: " + Bytes.toString(regionName) + " due to Exception: " + e );
- refresh = true;
- retry = true;
- }
-
+ catch (RetryTransactionException rte) {
+ if (rte.toString().contains("Asked to commit a non-pending transaction ")) {
+ LOG.error(" doCommitX will not retry transaction: " + transactionId , rte);
+ refresh = false;
+ retry = false;
}
- if (refresh) {
-
+ if (retryCount == RETRY_ATTEMPTS) {
+ String errMsg = "Exceeded retry attempts in doAbortX: " + retryCount;
+ LOG.error(errMsg, rte);
+ throw new DoNotRetryIOException(errMsg, rte);
+ }
+ else {
+ LOG.error("doAbortX retrying transaction: " + transactionId + " participantNum: "
+ + participantNum + " region: " + Bytes.toString(regionName) + " due to Exception: " ,rte );
+ refresh = true;
+ retry = true;
+ }
+ }
+ if (refresh) {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS){
- LOG.error("Exceeded retry attempts in doAbortX: " + retryCount + " (ingoring)");
- }
-
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
if (LOG.isWarnEnabled()) LOG.warn("doAbortX -- " + table.toString() + " location being refreshed");
if (LOG.isWarnEnabled()) LOG.warn("doAbortX -- lv_hri: " + lv_hri);
if (LOG.isWarnEnabled()) LOG.warn("doAbortX -- location.getRegionInfo(): " + location.getRegionInfo());
table.getRegionLocation(startKey, true);
-// }
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- setting retry, count: " + retryCount);
refresh = false;
}
@@ -1101,10 +1070,14 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- before coprocessorService txid: " + transactionId + " table: " +
table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
result = table.coprocessorService(SsccRegionService.class, startKey, endKey, callable);
+ } catch (ServiceException se) {
+ String msg = "ERROR occurred while calling doAbortX coprocessor service";
+ LOG.error(msg + ":", se);
+ throw new DoNotRetryIOException(msg, se);
} catch (Throwable e) {
String msg = "ERROR occurred while calling doAbortX coprocessor service";
- LOG.error(msg + ":" + e);
- throw new Exception(msg);
+ LOG.error(msg + ":", e);
+ throw new DoNotRetryIOException(msg,e);
}
if(result.size() != 1) {
@@ -1116,23 +1089,25 @@ public class TransactionManager {
for (SsccAbortTransactionResponse cresponse : result.values()) {
if(cresponse.getHasException()) {
String exceptionString = cresponse.getException();
- LOG.error("Abort of transaction: " + transactionId + " threw Exception: " + exceptionString);
+ String errMsg = new String("Abort of transaction: " + transactionId + " threw Exception: " + exceptionString);
+ LOG.error(errMsg);
if(exceptionString.contains("UnknownTransactionException")) {
- throw new UnknownTransactionException();
+ throw new UnknownTransactionException(errMsg);
}
- throw new Exception(cresponse.getException());
+ throw new RetryTransactionException(cresponse.getException());
}
}
retry = false;
- }
}
- catch (UnknownTransactionException ute) {
- LOG.debug("UnknownTransactionException in doAbortX by participant " + participantNum +
- " for transaction: " + transactionId + "(ignoring): " + ute);
}
- catch (Exception e) {
+ catch (RetryTransactionException rte) {
+ if (retryCount == RETRY_ATTEMPTS){
+ String errMsg = new String ("Exceeded retry attempts in doAbortX: " + retryCount + " (Not ingoring)");
+ LOG.error(errMsg);
+ throw new RollbackUnsuccessfulException(errMsg, rte);
+ }
LOG.error("doAbortX participant " + participantNum + " retrying transaction "
- + transactionId + " due to Exception: " + e);
+ + transactionId + " due to Exception: " + rte);
refresh = true;
retry = true;
}
@@ -1143,17 +1118,10 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS){
- LOG.error("Exceeded retry attempts in doAbortX: " + retryCount + " (ingoring)");
- }
-
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
if (LOG.isWarnEnabled()) LOG.warn("doAbortX -- " + table.toString() + " location being refreshed");
if (LOG.isWarnEnabled()) LOG.warn("doAbortX -- lv_hri: " + lv_hri);
if (LOG.isWarnEnabled()) LOG.warn("doAbortX -- location.getRegionInfo(): " + location.getRegionInfo());
table.getRegionLocation(startKey, true);
-// }
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- setting retry, count: " + retryCount);
refresh = false;
}
@@ -1211,11 +1179,16 @@ public class TransactionManager {
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
commitMultipleResponse = trxService.commitMultiple(null, commitMultipleRequest);
retry = false;
- } catch (Throwable e) {
- LOG.error("doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e);
+ } catch (ServiceException se) {
+ String errMsg = "doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId;
+ LOG.error(errMsg, se);
refresh = true;
retry = true;
- }
+ } catch (Throwable e) {
+ String errMsg = "doCommitX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId;
+ LOG.error(errMsg,e);
+ throw new CommitUnsuccessfulException(errMsg, e);
+ }
if(!retry) {
List<String> exceptions = commitMultipleResponse.getExceptionList();
@@ -1225,16 +1198,15 @@ public class TransactionManager {
}
}
}
- catch (Exception e) {
- if(e instanceof UnknownTransactionException) {
- String errMsg = new String("Got unknown exception in doCommitX for transaction: " + transactionId
- + " participant " + participantNum + " " + e);
- LOG.error(errMsg);
+ catch (RetryTransactionException rte) {
+ if(retryCount == RETRY_ATTEMPTS) {
+ LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId, rte);
transactionState.requestPendingCountDec(true);
- throw new UnknownTransactionException(errMsg);
+ throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId,
+ rte);
}
LOG.error("doCommitX retrying transaction " + transactionId
- + " participant " + participantNum + " due to Exception: " + e);
+ + " participant " + participantNum + " due to Exception: ", rte);
refresh = true;
retry = true;
}
@@ -1245,11 +1217,6 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS) {
- LOG.error("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
- transactionState.requestPendingCountDec(true);
- throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
- }
if (LOG.isWarnEnabled()) {
LOG.warn("doCommitX -- " + table.toString() + " location being refreshed");
@@ -1296,11 +1263,15 @@ public class TransactionManager {
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
commitMultipleResponse = trxService.commitRequestMultiple(null, commitMultipleRequest);
retry = false;
- } catch (Throwable e) {
- LOG.error("doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e);
+ } catch (ServiceException se) {
+ String errMsg = new String("doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId );
+ LOG.error(errMsg, se);
refresh = true;
retry = true;
- //throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error");
+ } catch (Throwable e) {
+ String errMsg = "doPrepareX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId;
+ LOG.error(errMsg, e);
+ throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error", e);
}
if(!retry) {
results = commitMultipleResponse.getResultList();
@@ -1310,16 +1281,17 @@ public class TransactionManager {
}
}
- catch(Exception e) {
- if(e instanceof UnknownTransactionException) {
- String errMsg = new String("UnknownTransaction in doPrepareX - Batch - by participant "
- + participantNum + " for transaction " + transactionId + " " + e);
- LOG.error(errMsg);
- transactionState.requestPendingCountDec(true);
- throw new UnknownTransactionException(errMsg);
+ catch(RetryTransactionException rte) {
+ if(retryCount == RETRY_ATTEMPTS){
+ LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount, rte);
+ // We have received our reply in the form of an exception,
+ // so decrement outstanding count and wake up waiters to avoid
+ // getting hung forever
+ transactionState.requestPendingCountDec(true);
+ throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount, rte);
}
LOG.error("doPrepareX - Batch - retrying for participant "
- + participantNum + " transaction " + transactionId + " due to Exception: " + e);
+ + participantNum + " transaction " + transactionId + " due to Exception: ", rte);
refresh = true;
retry = true;
}
@@ -1329,14 +1301,6 @@ public class TransactionManager {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -Batch- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
- if(retryCount == RETRY_ATTEMPTS){
- LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount);
- // We have received our reply in the form of an exception,
- // so decrement outstanding count and wake up waiters to avoid
- // getting hung forever
- transactionState.requestPendingCountDec(true);
- throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount);
- }
if (LOG.isWarnEnabled()) {
LOG.warn("doPrepareX -Batch- " + table.toString() + " location being refreshed");
LOG.warn("doPrepareX -Batch- lv_hri: " + lv_hri);
@@ -1431,10 +1395,14 @@ public class TransactionManager {
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
abortTransactionMultipleResponse = trxService.abortTransactionMultiple(null, abortTransactionMultipleRequest);
retry = false;
- } catch (Throwable e) {
- LOG.error("doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e);
+ } catch (ServiceException se) {
+ LOG.error("doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":",se);
refresh = true;
retry = true;
+ } catch (Throwable e) {
+ String errMsg = "doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId;
+ LOG.error(errMsg, e);
+ throw new RollbackUnsuccessfulException("doAbortX, Batch - coprocessor error", e);
}
if(!retry) {
List<String> exceptions = abortTransactionMultipleResponse.getExceptionList();
@@ -1444,15 +1412,16 @@ public class TransactionManager {
}
}
}
- catch (UnknownTransactionException ute) {
- LOG.debug("UnknownTransactionException in doAbortX - Batch - by participant " + participantNum
- + " for transaction: " + transactionId + "(ignoring): " + ute);
- }
- catch (Exception e) {
- LOG.error("doAbortX - Batch - participant " + participantNum + " retrying transaction "
- + transactionId + " due to Exception: " + e);
- refresh = true;
- retry = true;
+ catch (RetryTransactionException rte) {
+ if(retryCount == RETRY_ATTEMPTS){
+ String errMsg = "Exceeded retry attempts in doAbortX: " + retryCount + " (not ingoring)";
+ LOG.error(errMsg, rte);
+ throw new RollbackUnsuccessfulException("doAbortX, Batch - coprocessor error", rte);
+ }
+ LOG.error("doAbortX - Batch - participant " + participantNum + " retrying transaction "
+ + transactionId + " due to Exception: ", rte);
+ refresh = true;
+ retry = true;
}
if (refresh) {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
@@ -1500,6 +1469,7 @@ public class TransactionManager {
// No need to add to retry list, throw exception if not ignoring
logException.append("Encountered " + exception + " on region: " +
locations.get(i).getRegionInfo().getRegionNameAsString());
+ throw new DoNotRetryIOException(logException.toString());
}
else if (exception.equals(BatchException.EXCEPTION_RETRY_ERR.toString()) ||
exception.equals(BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString())) {
@@ -1508,7 +1478,7 @@ public class TransactionManager {
ts.addRegionToRetry(locations.get(i));
}
if(logException.length() > 0) {
- throw new IOException(logException.toString());
+ throw new RetryTransactionException(logException.toString());
}
}
if(LOG.isTraceEnabled()) LOG.trace("checkException -- EXIT txid: " + ts.getTransactionId());
@@ -1621,8 +1591,8 @@ public class TransactionManager {
idServer.id(ID_TM_SERVER_TIMEOUT, startId);
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction (local) idServer.id returned: " + startId.val);
} catch (IdTmException exc) {
- LOG.error("beginTransaction (local) : IdTm threw exception " + exc);
- throw new IdTmException("beginTransaction (local) : IdTm threw exception " + exc);
+ LOG.error("beginTransaction (local) : IdTm threw exception ", exc);
+ throw new IdTmException("beginTransaction (local) : IdTm threw exception ", exc);
}
startIdVal = startId.val;
}
@@ -1657,7 +1627,6 @@ public class TransactionManager {
// (need one CompletionService per request for thread safety, can share pool of threads
CompletionService<Integer> compPool = new ExecutorCompletionService<Integer>(threadPool);
- try {
ServerName servername;
List<TransactionRegionLocation> regionList;
Map<ServerName, List<TransactionRegionLocation>> locations = new HashMap<ServerName, List<TransactionRegionLocation>>();
@@ -1682,15 +1651,23 @@ public class TransactionManager {
}
});
}
- } catch (Exception e) {
- throw new CommitUnsuccessfulException(e);
- }
// loop to retrieve replies
int commitError = 0;
- try {
for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) {
- Integer canCommit = compPool.take().get();
+ boolean loopExit = false;
+ Integer canCommit = null;
+ do
+ {
+ try {
+ canCommit = compPool.take().get();
+ loopExit = true;
+ }
+ catch (InterruptedException ie) {}
+ catch (ExecutionException e) {
+ throw new CommitUnsuccessfulException(e);
+ }
+ } while (loopExit == false);
switch (canCommit) {
case TM_COMMIT_TRUE:
allReadOnly = false;
@@ -1727,7 +1704,19 @@ public class TransactionManager {
transactionState.clearRetryRegions();
}
for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) {
- Integer canCommit = compPool.take().get();
+ boolean loopExit = false;
+ Integer canCommit = null;
+ do
+ {
+ try {
+ canCommit = compPool.take().get();
+ loopExit = true;
+ }
+ catch (InterruptedException ie) {}
+ catch (ExecutionException e) {
+ throw new CommitUnsuccessfulException(e);
+ }
+ } while (loopExit == false);
switch (canCommit) {
case TM_COMMIT_TRUE:
allReadOnly = false;
@@ -1746,9 +1735,6 @@ public class TransactionManager {
commitError = TransactionalReturn.COMMIT_UNSUCCESSFUL;;
}
}
- }catch (Exception e) {
- throw new CommitUnsuccessfulException(e);
- }
if(commitError != 0)
return commitError;
@@ -1771,7 +1757,6 @@ public class TransactionManager {
// (need one CompletionService per request for thread safety, can share pool of threads
CompletionService<Integer> compPool = new ExecutorCompletionService<Integer>(threadPool);
- try {
if(batchRSMetricsFlag)
locations = new HashMap<ServerName, List<TransactionRegionLocation>>();
@@ -1824,18 +1809,22 @@ public class TransactionManager {
}
metricsCount++;
}
-
- } catch (Exception e) {
- LOG.error("exception in prepareCommit for transaction: "
- + transactionState.getTransactionId() + " (during submit to pool): " + e);
- throw new CommitUnsuccessfulException(e);
- }
-
// loop to retrieve replies
int commitError = 0;
- try {
for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) {
- int canCommit = compPool.take().get();
+ boolean loopExit = false;
+ Integer canCommit = null;
+ do
+ {
+ try {
+ canCommit = compPool.take().get();
+ loopExit = true;
+ }
+ catch (InterruptedException ie) {}
+ catch (ExecutionException e) {
+ throw new CommitUnsuccessfulException(e);
+ }
+ } while (loopExit == false);
switch (canCommit) {
case TM_COMMIT_TRUE:
allReadOnly = false;
@@ -1855,11 +1844,6 @@ public class TransactionManager {
commitError = TransactionalReturn.COMMIT_UNSUCCESSFUL;;
}
}
- }catch (Exception e) {
- LOG.error("exception in prepareCommit for transaction: "
- + transactionState.getTransactionId() + " (during completion processing): " + e);
- throw new CommitUnsuccessfulException(e);
- }
if(commitError != 0)
return commitError;
@@ -1956,7 +1940,7 @@ public class TransactionManager {
+ ((EnvironmentEdgeManager.currentTime() - startTime)) + "]ms");
}
- public void retryCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction) {
+ public void retryCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction) throws IOException {
if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- ENTRY -- txid: " + transactionState.getTransactionId());
synchronized(transactionState.getRetryRegions()) {
List<TransactionRegionLocation> completedList = new ArrayList<TransactionRegionLocation>();
@@ -1985,7 +1969,7 @@ public class TransactionManager {
if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- EXIT -- txid: " + transactionState.getTransactionId());
}
- public void retryAbort(final TransactionState transactionState) {
+ public void retryAbort(final TransactionState transactionState) throws IOException {
if(LOG.isTraceEnabled()) LOG.trace("retryAbort -- ENTRY -- txid: " + transactionState.getTransactionId());
synchronized(transactionState.getRetryRegions()) {
List<TransactionRegionLocation> completedList = new ArrayList<TransactionRegionLocation>();
@@ -2016,7 +2000,7 @@ public class TransactionManager {
* @throws CommitUnsuccessfulException
*/
public void doCommit(final TransactionState transactionState)
- throws CommitUnsuccessfulException, UnsuccessfulDDLException {
+ throws CommitUnsuccessfulException, UnsuccessfulDDLException, IOException {
if (LOG.isTraceEnabled()) LOG.trace("doCommit [" + transactionState.getTransactionId() +
"] ignoreUnknownTransaction not supplied");
doCommit(transactionState, false);
@@ -2030,10 +2014,9 @@ public class TransactionManager {
* @throws CommitUnsuccessfulException
*/
public void doCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction)
- throws CommitUnsuccessfulException, UnsuccessfulDDLException {
+ throws CommitUnsuccessfulException, UnsuccessfulDDLException, IOException {
int loopCount = 0;
if (batchRegionServer && (TRANSACTION_ALGORITHM == AlgorithmType.MVCC)) {
- try {
if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() +
"] ignoreUnknownTransaction: " + ignoreUnknownTransaction);
// Set the commitId
@@ -2072,19 +2055,8 @@ public class TransactionManager {
}
});
}
- } catch (Exception e) {
- LOG.error("exception in doCommit for transaction: " + transactionState.getTransactionId() + " " + e);
- // This happens on a NSRE that is triggered by a split
- throw new CommitUnsuccessfulException(e);
- }
-
// all requests sent at this point, can record the count
transactionState.completeSendInvoke(loopCount);
- /*
- try {
- Thread.sleep(500);
- } catch(Exception e) {}
- */
}
else {
// non batch-rs
@@ -2093,7 +2065,6 @@ public class TransactionManager {
"] ignoreUnknownTransactionn: " + ignoreUnknownTransaction);
if (LOG.isTraceEnabled()) LOG.trace("sending commits for ts: " + transactionState);
- try {
int participants = transactionState.participatingRegions.size() - transactionState.regionsToIgnore.size();
if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() + "] with " + participants + " participants" );
// (Asynchronously send commit
@@ -2121,20 +2092,9 @@ public class TransactionManager {
}
});
}
- } catch (Exception e) {
- LOG.error("exception in doCommit for transaction: "
- + transactionState.getTransactionId() + " " + e);
- // This happens on a NSRE that is triggered by a split
- throw new CommitUnsuccessfulException(e);
- }
// all requests sent at this point, can record the count
transactionState.completeSendInvoke(loopCount);
- /*
- try {
- Thread.sleep(500);
- } catch(Exception e) {}
- */
}
//if DDL is involved with this transaction, need to complete it.
@@ -2142,25 +2102,20 @@ public class TransactionManager {
{
//First wait for commit requests sent to all regions is received back.
//This TM thread gets SUSPENDED until all commit threads complete!!!
- try{
+ boolean loopExit = false;
+ do
+ {
+ try {
transactionState.completeRequest();
- }
- catch(Exception e){
- LOG.error("Exception in doCommit completeRequest. txID: " + transactionState.getTransactionId() + "Exception: " + e);
- //return; //Do not return here. This thread should continue servicing DDL operations.
- }
+ loopExit = true;
+ }
+ catch (InterruptedException ie) {}
+ } while (loopExit == false);
if (LOG.isDebugEnabled()) LOG.debug("doCommit() [" + transactionState.getTransactionId()
+ "] performing commit DDL");
-
- try{
doCommitDDL(transactionState);
-
- } catch (Exception e) {
- LOG.error("FATAL Exception calling doCommitDDL for transaction: " + transactionState.getTransactionId() + "Exception: " + e);
- throw new UnsuccessfulDDLException(e);
- }
}
}
@@ -2352,7 +2307,7 @@ public class TransactionManager {
if (transactionState.getRegionsToIgnore().contains(location)) {
continue;
}
- try {
+ //try {
loopCount++;
final int participantNum = loopCount;
final byte[] regionName = location.getRegionInfo().getRegionName();
@@ -2365,9 +2320,11 @@ public class TransactionManager {
return doAbortX(regionName, transactionState.getTransactionId(), participantNum, location.isTableRecodedDropped());
}
});
+/*
} catch (Exception e) {
LOG.error("exception in abort: " + e);
}
+*/
/*
} catch (UnknownTransactionException e) {
LOG.error("exception in abort: " + e);
@@ -2659,10 +2616,9 @@ public class TransactionManager {
}
public void createTable(final TransactionState transactionState, HTableDescriptor desc, Object[] beginEndKeys)
- throws Exception{
+ throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("createTable ENTRY, transactionState: " + transactionState.getTransactionId());
- try {
if (beginEndKeys != null && beginEndKeys.length > 0) {
byte[][] keys = new byte[beginEndKeys.length][];
for (int i = 0; i < beginEndKeys.length; i++){
@@ -2681,12 +2637,6 @@ public class TransactionManager {
//record this create in TmDDL.
tmDDL.putRow( transactionState.getTransactionId(), "CREATE", desc.getNameAsString());
- }
- catch (Exception e) {
- LOG.error("createTable Exception TxId: " + transactionState.getTransactionId() + "Exception: " + e);
- throw e;
- }
-
}
private class ChangeFlags {
@@ -2930,10 +2880,9 @@ public class TransactionManager {
public void alterTable(final TransactionState transactionState, String tblName, Object[] tableOptions)
- throws Exception {
+ throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("createTable ENTRY, transactionState: " + transactionState.getTransactionId());
- try {
HTableDescriptor htblDesc = hbadmin.getTableDescriptor(tblName.getBytes());
HColumnDescriptor[] families = htblDesc.getColumnFamilies();
HColumnDescriptor colDesc = families[0]; // Trafodion keeps SQL columns only in first column family
@@ -2957,41 +2906,28 @@ public class TransactionManager {
//record this create in TmDDL.
tmDDL.putRow( transactionState.getTransactionId(), "ALTER", tblName);
-
- }
- catch (Exception e) {
- LOG.error("createTable Exception TxId: " + transactionState.getTransactionId() + "Exception: " + e);
- throw e;
- }
}
public void registerTruncateOnAbort(final TransactionState transactionState, String tblName)
- throws Exception {
+ throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("registerTruncateOnAbort ENTRY, TxID " + transactionState.getTransactionId() +
" tableName: " + tblName);
// register the truncate on abort to TmDDL
- try {
// add truncate record to TmDDL
tmDDL.putRow(transactionState.getTransactionId(), "TRUNCATE", tblName);
// Set transaction state object as participating in ddl transaction.
transactionState.setDDLTx(true);
- }
- catch (Exception e) {
- LOG.error("registerTruncateOnAbort Exception Txid:" + transactionState.getTransactionId() +"TableName: " + tblName + "Exception:" + e);
- throw e;
- }
}
public void dropTable(final TransactionState transactionState, String tblName)
- throws Exception{
+ throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("dropTable ENTRY, TxId: " + transactionState.getTransactionId() + "TableName: " + tblName);
//Record this drop table request in TmDDL.
//Note that physical disable of this table happens in prepare phase.
//Followed by physical drop of this table in commit phase.
- try {
// add drop record to TmDDL.
tmDDL.putRow( transactionState.getTransactionId(), "DROP", tblName);
@@ -3005,16 +2941,11 @@ public class TransactionManager {
if(trl.getRegionInfo().getTable().toString().compareTo(tblName) == 0)
trl.setTableRecordedDropped();
}
- }
- catch (Exception e) {
- LOG.error("dropTable Exception TxId: " + transactionState.getTransactionId() + "TableName:" + tblName + "Exception: " + e);
- throw e;
- }
}
//Called only by Abort or Commit processing.
public void deleteTable(final TransactionState transactionState, final String tblName)
- throws Exception{
+ throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("deleteTable ENTRY, TxId: " + transactionState.getTransactionId() + " tableName " + tblName);
try{
disableTable(transactionState, tblName);
@@ -3024,41 +2955,22 @@ public class TransactionManager {
//if (LOG.isTraceEnabled()) LOG.trace("deleteTable , TableNotEnabledException. This is a expected exception. Step: disableTable, TxId: " +
// transactionState.getTransactionId() + " TableName " + tblName + "Exception: " + e);
}
- catch (Exception e) {
- LOG.error("deleteTable Exception TxId: " + transactionState.getTransactionId() + " TableName " + tblName + "Exception: " + e);
- throw e;
- }
-
- try{
hbadmin.deleteTable(tblName);
- }
- catch (Exception e) {
- LOG.error("deleteTable Exception TxId: " + transactionState.getTransactionId() + " TableName " + tblName + "Exception: " + e);
- throw e;
- }
}
//Called only by Abort processing.
public void enableTable(final TransactionState transactionState, String tblName)
- throws Exception{
+ throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("enableTable ENTRY, TxID: " + transactionState.getTransactionId() + " tableName " + tblName);
- try {
hbadmin.enableTable(tblName);
- }
- catch (Exception e) {
- //LOG.error("enableTable Exception TxId: " + transactionState.getTransactionId() + " TableName " + tblName + "Exception: " + e);
- //Let the caller log this and handle exception. Some scenarios this exception is expected.
- throw e;
- }
}
// Called only by Abort processing to delete data from a table
public void truncateTable(final TransactionState transactionState, String tblName)
- throws Exception{
+ throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("truncateTable ENTRY, TxID: " + transactionState.getTransactionId() +
"table: " + tblName);
- try {
TableName tablename = TableName.valueOf(tblName);
HTableDescriptor hdesc = hbadmin.getTableDescriptor(tablename);
@@ -3068,25 +2980,13 @@ public class TransactionManager {
hbadmin.deleteTable(tblName);
hbadmin.createTable(hdesc);
hbadmin.close();
- }
- catch (Exception e) {
- LOG.error("truncateTable Exception TxId: " + transactionState.getTransactionId() + " TableName " + tblName+ "Exception: " + e);
- throw e;
- }
}
//Called only by DoPrepare.
public void disableTable(final TransactionState transactionState, String tblName)
- throws Exception{
+ throws IOException{
if (LOG.isTraceEnabled()) LOG.trace("disableTable ENTRY, TxID: " + transactionState.getTransactionId() + " tableName " + tblName);
- try {
hbadmin.disableTable(tblName);
- }
- catch (Exception e) {
- //LOG.error("disableTable Exception TxId: " + transactionState.getTransactionId() + " TableName " + tblName + "Exception: " + e);
- //Let the caller handle this exception since table being disabled could be redundant many times.
- throw e;
- }
if (LOG.isTraceEnabled()) LOG.trace("disableTable EXIT, TxID: " + transactionState.getTransactionId() + " tableName " + tblName);
}
@@ -3097,7 +2997,7 @@ public class TransactionManager {
* @return
* @throws Exception
*/
- public List<Long> recoveryRequest (String hostnamePort, byte[] regionArray, int tmid) throws Exception{
+ public List<Long> recoveryRequest (String hostnamePort, byte[] regionArray, int tmid) throws DeserializationException, IOException {
if (LOG.isTraceEnabled()) LOG.trace("recoveryRequest -- ENTRY TM" + tmid);
HRegionInfo regionInfo = null;
HTable table = null;
@@ -3130,19 +3030,7 @@ public class TransactionManager {
}
*/
- try {
regionInfo = HRegionInfo.parseFrom(regionArray);
- }
- catch (Exception de) {
- if (LOG.isTraceEnabled()) LOG.trace("TransactionManager:recoveryRequest exception in regionInfo parseFrom, " +
- " TM : " + tmid +
- " DeserializationException: " + de);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- de.printStackTrace(pw);
- LOG.error("DeserializationException in regionInfo parseFrom, unable to complete recoveryRequest\n" + sw.toString());
- throw new DeserializationException("DeserializationException in regionInfo parseFrom, unable to complete recoveryRequest ");
- }
final String regionName = regionInfo.getRegionNameAsString();
final int tmID = tmid;
@@ -3178,9 +3066,13 @@ public class TransactionManager {
try {
rresult = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
}
- catch (Throwable e) {
- LOG.error("Exception thrown when calling coprocessor: " + e.toString());
- e.printStackTrace();
+ catch (ServiceException se) {
+ LOG.error("Exception thrown when calling coprocessor: ", se);
+ throw new IOException("Problem with calling coprocessor, no regions returned result", se);
+ }
+ catch (Throwable t) {
+ LOG.error("Exception thrown when calling coprocessor: ", t);
+ throw new IOException("Problem with calling coprocessor, no regions returned result", t);
}
Collection<RecoveryRequestResponse> results = rresult.values();
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalScanner.java.tmpl
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalScanner.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalScanner.java.tmpl
index 40bbeeb..371f232 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalScanner.java.tmpl
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalScanner.java.tmpl
@@ -111,40 +111,28 @@ public class TransactionalScanner extends AbstractClientScanner {
return;
}
this.closed = true;
- if(this.interrupted) {
- if(LOG.isDebugEnabled()) LOG.debug("close() resetting connection, txID: " + ts.getTransactionId());
- try {
- ttable.resetConnection();
- } catch(IOException e) {
- if(LOG.isErrorEnabled()) LOG.error("close() unable to reset connection, txID: " + ts.getTransactionId());
- return;
- }
- this.interrupted = false;
- }
TrxRegionProtos.CloseScannerRequest.Builder requestBuilder = CloseScannerRequest.newBuilder();
requestBuilder.setTransactionId(ts.getTransactionId());
requestBuilder.setRegionName(ByteString.copyFromUtf8(currentRegion.getRegionNameAsString()));
requestBuilder.setScannerId(scannerID);
TrxRegionProtos.CloseScannerRequest closeRequest = requestBuilder.build();
+ TrxRegionProtos.CloseScannerResponse response = null;
try {
CoprocessorRpcChannel channel = ttable.coprocessorService(this.currentBeginKey);
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
- TrxRegionProtos.CloseScannerResponse response = trxService.closeScanner(null, closeRequest);
- String exception = response.getException();
- if(response.getHasException()) {
- String errMsg = "closeScanner encountered Exception txID: " +
- ts.getTransactionId() + " Exception: " + exception;
- LOG.error(errMsg);
- }
+ response = trxService.closeScanner(null, closeRequest);
}
- catch(ServiceException se) {
- this.interrupted = true;
- this.closed = false;
- }
-
catch (Throwable e) {
String errMsg = "CloseScanner error on coprocessor call, scannerID: " + this.scannerID + " " + e;
LOG.error(errMsg);
+ //throw new IOException(errMsg, e);
+ }
+ if (response.getHasException()) {
+ String exception = response.getException();
+ String errMsg = "closeScanner encountered Exception txID: " +
+ ts.getTransactionId() + " Exception: " + exception;
+ LOG.error(errMsg);
+ //throw new IOException(errMsg);
}
if(LOG.isTraceEnabled()) LOG.trace("close() -- EXIT txID: " + ts.getTransactionId());
@@ -152,11 +140,6 @@ public class TransactionalScanner extends AbstractClientScanner {
protected boolean nextScanner(final boolean done) throws IOException{
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() -- ENTRY txID: " + ts.getTransactionId());
- if(this.interrupted) {
- if(LOG.isDebugEnabled()) LOG.debug("nextScanner() resetting connection, txID: " + ts.getTransactionId());
- ttable.resetConnection();
- this.interrupted = false;
- }
if(this.currentBeginKey != null) {
if(LOG.isTraceEnabled()) LOG.trace("nextScanner() currentBeginKey != null txID: " + ts.getTransactionId());
if (doNotCloseOnLast)
@@ -194,33 +177,28 @@ public class TransactionalScanner extends AbstractClientScanner {
requestBuilder.setRegionName(ByteString.copyFromUtf8(currentRegion.getRegionNameAsString()));
requestBuilder.setScan(ProtobufUtil.toScan(scan));
TrxRegionProtos.OpenScannerRequest openRequest = requestBuilder.build();
+ TrxRegionProtos.OpenScannerResponse response = null;
try {
CoprocessorRpcChannel channel = ttable.coprocessorService(this.currentBeginKey);
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
- TrxRegionProtos.OpenScannerResponse response = trxService.openScanner(null, openRequest);
- String exception = response.getException();
- if(response.getHasException()) {
- String errMsg = "nextScanner encountered Exception txID: " +
- ts.getTransactionId() + " Exception: " + exception;
- LOG.error(errMsg);
- throw new IOException(errMsg);
- }
+ response = trxService.openScanner(null, openRequest);
this.scannerID = response.getScannerId();
}
- catch(ServiceException se) {
- this.interrupted = true;
- String errMsg = "OpenScanner error encountered Service Exception, scannerID: " + this.scannerID + " " + se;
- LOG.error(errMsg);
- throw new IOException(errMsg);
- }
catch (Throwable e) {
- String errMsg = "OpenScanner error on coprocessor call, scannerID: " + this.scannerID + " " + e;
- LOG.error(errMsg);
- throw new IOException(errMsg);
+ String errMsg = "OpenScanner error on coprocessor call, scannerID: " + this.scannerID ;
+ LOG.error(errMsg, e);
+ throw new IOException(errMsg, e);
}
- this.nextCallSeq = 0;
- if(LOG.isTraceEnabled()) LOG.trace("nextScanner() -- EXIT -- returning true txID: " + ts.getTransactionId());
- return true;
+ if (response.getHasException()) {
+ String exception = response.getException();
+ String errMsg = "nextScanner encountered Exception txID: " +
+ ts.getTransactionId() + " Exception: " + exception;
+ LOG.error(errMsg);
+ throw new IOException(errMsg);
+ }
+ this.nextCallSeq = 0;
+ if(LOG.isTraceEnabled()) LOG.trace("nextScanner() -- EXIT -- returning true txID: " + ts.getTransactionId());
+ return true;
}
@Override
@@ -248,24 +226,19 @@ public class TransactionalScanner extends AbstractClientScanner {
CoprocessorRpcChannel channel = ttable.coprocessorService(this.currentBeginKey);
TrxRegionService.BlockingInterface trxService = TrxRegionService.newBlockingStub(channel);
response = trxService.performScan(null, perfScanRequest);
- String exception = response.getException();
- if(response.getHasException()) {
- String errMsg = "performScan encountered Exception txID: " +
- ts.getTransactionId() + " Exception: " + exception;
- LOG.error(errMsg);
- throw new IOException(errMsg);
- }
- }
- catch (ServiceException se) {
- this.interrupted = true;
- if(LOG.isDebugEnabled()) LOG.debug("PerformScan encountered Service Exception, scannerID: " + this.scannerID + " " + se);
- return null;
}
catch (Throwable e) {
- String errMsg = "PerformScan error on coprocessor call, scannerID: " + this.scannerID + " " + e;
- if(LOG.isErrorEnabled()) LOG.error(errMsg);
- throw new IOException(errMsg);
+ String errMsg = "PerformScan error on coprocessor call, scannerID: " + this.scannerID;
+ if(LOG.isErrorEnabled()) LOG.error(errMsg, e);
+ throw new IOException(errMsg, e);
}
+ if (response.getHasException()) {
+ String exception = response.getException();
+ String errMsg = "performScan encountered Exception txID: " +
+ ts.getTransactionId() + " Exception: " + exception;
+ LOG.error(errMsg);
+ throw new IOException(errMsg);
+ }
int count;
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Result result;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
index b295171..f11b2f6 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
@@ -133,12 +133,6 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
super(tableName, connection, threadPool);
}
- public void resetConnection() throws IOException {
- if (LOG.isDebugEnabled()) LOG.debug("Resetting connection for " + this.getTableDescriptor().getTableName());
- HConnection conn = this.getConnection();
- conn = HConnectionManager.createConnection(this.getConfiguration());
- }
-
private void addLocation(final TransactionState transactionState, HRegionLocation location) {
if (LOG.isTraceEnabled()) LOG.trace("addLocation ENTRY");
if (transactionState.addRegion(location)){
@@ -209,20 +203,8 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor");
- }
- //Collection<GetTransactionalResponse> results = result.values();
- // Should only be one result, if more than one. Can't handle.
- // Need to test whether '!=' or '>' is correct
- //if (LOG.isTraceEnabled()) LOG.trace("Results count: " + results.size());
- //if(results.size() != 1)
- // throw new IOException("Incorrect number of results from coprocessor call");
- //GetTransactionalResponse[] resultArray = new GetTransactionalResponse[results.size()];
- //results.toArray(resultArray);
- //if(resultArray.length == 0)
- // throw new IOException("Problem with calling coprocessor, no regions returned result");
-
+ throw new IOException("ERROR while calling coprocessor", e);
+ }
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
@@ -289,14 +271,9 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
retryCount++;
}
} while (retryCount < TransactionalTable.retries && retry == true);
- } catch (ServiceException e) {
- e.printStackTrace();
- throw new IOException();
} catch (Throwable t) {
- t.printStackTrace();
- throw new IOException();
+ throw new IOException("ERROR while calling coprocessor",t);
}
-
if(result == null)
throw new IOException(retryErrMsg);
else if(result.hasException())
@@ -368,8 +345,7 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
} while(retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor");
+ throw new IOException("ERROR while calling coprocessor", e);
}
if(result == null)
throw new IOException(retryErrMsg);
@@ -460,8 +436,7 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor");
+ throw new IOException("ERROR while calling coprocessor",e);
}
if(result == null)
throw new IOException(retryErrMsg);
@@ -542,10 +517,7 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- throw new IOException("ERROR while calling coprocessor " + sw.toString());
+ throw new IOException("ERROR while calling coprocessor ",e);
}
if(result == null)
@@ -642,8 +614,7 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor");
+ throw new IOException("ERROR while calling coprocessor", e);
}
if(result == null)
@@ -737,8 +708,7 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
} while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor");
+ throw new IOException("ERROR while calling coprocessor",e);
}
if(result == null)
throw new IOException(retryErrMsg);
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/IdTmException.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/IdTmException.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/IdTmException.java
index e6051e6..51fb9d3 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/IdTmException.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/IdTmException.java
@@ -37,5 +37,19 @@ public class IdTmException extends Exception {
public IdTmException(String message) {
super(message);
}
+ /**
+ * * @param arg0 cause
+ * */
+ public IdTmException(Throwable arg0) {
+ super(arg0);
+ }
+
+ /**
+ * * @param arg0 cause
+ * */
+ public IdTmException(String message, Throwable arg0) {
+ super(message, arg0);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index 9275eeb..4b1c4d8 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -502,13 +502,14 @@ public class HBaseTxClient {
LOG.info("Exit default RET_EXCEPTION prepareCommit, txid: " + transactionId);
return TransReturnCode.RET_EXCEPTION.getShort();
}
- } catch (IOException e) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException");
- return TransReturnCode.RET_IOEXCEPTION.getShort();
} catch (CommitUnsuccessfulException e) {
LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException");
return TransReturnCode.RET_NOCOMMITEX.getShort();
}
+ catch (IOException e) {
+ LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException");
+ return TransReturnCode.RET_IOEXCEPTION.getShort();
+ }
catch (Exception e) {
LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " Exception " + e);
return TransReturnCode.RET_NOCOMMITEX.getShort();
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sql/src/main/java/org/trafodion/sql/ByteArrayList.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/ByteArrayList.java b/core/sql/src/main/java/org/trafodion/sql/ByteArrayList.java
deleted file mode 100644
index 46b81fe..0000000
--- a/core/sql/src/main/java/org/trafodion/sql/ByteArrayList.java
+++ /dev/null
@@ -1,54 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.sql;
-
-import java.util.ArrayList;
-
-public class ByteArrayList extends ArrayList<byte[]> {
-
- private static final long serialVersionUID = -3557219337406352735L;
-
- void addElement(byte[] ba) {
- add(ba);
- }
-
- byte[] getElement(int i) {
- if (size() == 0)
- return null;
- else if (i < size())
- return get(i);
- else
- return null;
- }
-
- int getSize() {
- return size();
- }
-
- int getEntrySize(int i) {
- return get(i).length;
- }
-
- byte[] getEntry(int i) {
- return get(i);
- }
-}
[3/4] incubator-trafodion git commit: [TRAFODION-1988] Better Java
exception handling in Trafodion
Posted by db...@apache.org.
[TRAFODION-1988] Better Java exception handling in Trafodion
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/34abbbe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/34abbbe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/34abbbe7
Branch: refs/heads/master
Commit: 34abbbe70de21475d3a9e27ff74b7c290bd617c6
Parents: dcfc8e6
Author: selvaganesang <se...@esgyn.com>
Authored: Wed May 11 21:58:55 2016 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Thu May 12 03:06:46 2016 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/RMInterface.java | 43 +-
.../RetryTransactionException.java | 54 ++
.../RollbackUnsuccessfulException.java | 62 ++
.../transactional/SsccTransactionalTable.java | 34 +-
.../hbase/client/transactional/TmDDL.java | 24 +-
.../transactional/TransactionManager.java | 626 ++++++++-----------
.../TransactionalScanner.java.tmpl | 97 ++-
.../transactional/TransactionalTable.java | 46 +-
.../transactional/IdTmException.java | 14 +
.../java/org/trafodion/dtm/HBaseTxClient.java | 7 +-
.../java/org/trafodion/sql/ByteArrayList.java | 54 --
.../java/org/trafodion/sql/HBaseClient.java | 76 +--
.../java/org/trafodion/sql/HBulkLoadClient.java | 42 +-
.../java/org/trafodion/sql/HTableClient.java | 18 +-
.../main/java/org/trafodion/sql/HiveClient.java | 59 +-
.../java/org/trafodion/sql/ResultIterator.java | 133 ----
.../org/trafodion/sql/ResultKeyValueList.java | 100 ---
.../java/org/trafodion/sql/RowToInsert.java | 44 --
.../java/org/trafodion/sql/RowsToInsert.java | 57 --
.../org/trafodion/sql/SequenceFileWriter.java | 129 +---
.../java/org/trafodion/sql/StringArrayList.java | 47 --
21 files changed, 515 insertions(+), 1251 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
index a0e92ff..df74a45 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
@@ -163,8 +163,8 @@ public class RMInterface {
idServer.id(ID_TM_SERVER_TIMEOUT, startId);
if (LOG.isTraceEnabled()) LOG.trace("registerTransaction idServer.id returned: " + startId.val);
} catch (IdTmException exc) {
- LOG.error("registerTransaction: IdTm threw exception " + exc);
- throw new IOException("registerTransaction: IdTm threw exception " + exc);
+ LOG.error("registerTransaction: IdTm threw exception " , exc);
+ throw new IOException("registerTransaction: IdTm threw exception ", exc);
}
startIdVal = startId.val;
}
@@ -224,63 +224,30 @@ public class RMInterface {
public void createTable(HTableDescriptor desc, byte[][] keys, int numSplits, int keyLength, long transID) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: ");
-
- try {
byte[] lv_byte_desc = desc.toByteArray();
byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("Unable to createTable or convert table descriptor to byte array " + e);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- LOG.error("desc.ByteArray error " + sw.toString());
- throw new IOException("createTable exception. Unable to create table.");
- }
}
public void truncateTableOnAbort(String tblName, long transID) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("truncateTableOnAbort ENTER: ");
-
- try {
byte[] lv_byte_tblName = tblName.getBytes();
truncateOnAbortReq(lv_byte_tblName, transID);
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("Unable to truncateTableOnAbort" + e);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- LOG.error("truncateTableOnAbort error: " + sw.toString());
- throw new IOException("truncateTableOnAbort exception. Unable to create table.");
- }
}
public void dropTable(String tblName, long transID) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("dropTable ENTER: ");
- try {
byte[] lv_byte_tblname = tblName.getBytes();
dropTableReq(lv_byte_tblname, transID);
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("Unable to dropTable " + e);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- LOG.error("dropTable error " + sw.toString());
- }
}
public void alter(String tblName, Object[] tableOptions, long transID) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("alter ENTER: ");
- try {
byte[] lv_byte_tblname = tblName.getBytes();
alterTableReq(lv_byte_tblname, tableOptions, transID);
- } catch (Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("Unable to alter table, exception: " + e);
- throw new IOException("alter exception. Unable to create table.");
- }
}
static public void clearTransactionStates(final long transactionID) {
@@ -294,13 +261,7 @@ public class RMInterface {
static public synchronized void unregisterTransaction(final long transactionID) {
TransactionState ts = null;
if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " + transactionID);
- try {
ts = mapTransactionStates.remove(transactionID);
- } catch (Exception e) {
- LOG.warn("Ignoring exception. mapTransactionStates.remove for transid " + transactionID +
- " failed with exception " + e);
- return;
- }
if (ts == null) {
LOG.warn("mapTransactionStates.remove did not find transid " + transactionID);
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RetryTransactionException.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RetryTransactionException.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RetryTransactionException.java
new file mode 100644
index 0000000..e767bb3
--- /dev/null
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RetryTransactionException.java
@@ -0,0 +1,54 @@
+/**
+* @@@ START COPYRIGHT @@@
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+* @@@ END COPYRIGHT @@@
+**/
+
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+/**
+ * Thrown if a region server is passed an unknown transaction id
+ */
+public class RetryTransactionException extends IOException {
+
+ private static final long serialVersionUID = 698575374929591089L;
+
+ /** constructor */
+ public RetryTransactionException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public RetryTransactionException(String s) {
+ super(s);
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public RetryTransactionException(String s, Throwable t) {
+ super(s, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RollbackUnsuccessfulException.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RollbackUnsuccessfulException.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RollbackUnsuccessfulException.java
new file mode 100644
index 0000000..fbf8d6d
--- /dev/null
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RollbackUnsuccessfulException.java
@@ -0,0 +1,62 @@
+/**
+* @@@ START COPYRIGHT @@@
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+* @@@ END COPYRIGHT @@@
+**/
+
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+/** Thrown when a transaction cannot be committed.
+ *
+ */
+public class RollbackUnsuccessfulException extends IOException {
+
+ private static final long serialVersionUID = 7062921444531109202L;
+
+ /** Default Constructor */
+ public RollbackUnsuccessfulException() {
+ super();
+ }
+
+ /**
+ * @param arg0 message
+ * @param arg1 cause
+ */
+ public RollbackUnsuccessfulException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ /**
+ * @param arg0 message
+ */
+ public RollbackUnsuccessfulException(String arg0) {
+ super(arg0);
+ }
+
+ /**
+ * @param arg0 cause
+ */
+ public RollbackUnsuccessfulException(Throwable arg0) {
+ super(arg0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
index 31e6d1b..3735b63 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
@@ -157,13 +157,6 @@ public class SsccTransactionalTable extends HTable implements TransactionalTable
super( tableName,connection, threadPool);
}
- public void resetConnection() throws IOException {
- if (LOG.isDebugEnabled()) LOG.debug("Resetting connection for " + this.getTableDescriptor().getTableName());
- HConnection conn = this.getConnection();
- conn = HConnectionManager.createConnection(this.getConfiguration());
- }
-
-
private void addLocation(final TransactionState transactionState, HRegionLocation location) {
if (LOG.isTraceEnabled()) LOG.trace("addLocation ENTRY");
if (transactionState.addRegion(location)){
@@ -215,8 +208,7 @@ public class SsccTransactionalTable extends HTable implements TransactionalTable
try {
result = super.coprocessorService(SsccRegionService.class, get.getRow(), get.getRow(), callable);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor get");
+ throw new IOException("ERROR while calling coprocessor get", e);
}
Collection<SsccGetTransactionalResponse> results = result.values();
// Should only be one result, if more than one. Can't handle.
@@ -275,12 +267,8 @@ public class SsccTransactionalTable extends HTable implements TransactionalTable
try {
result = super.coprocessorService(SsccRegionService.class, row, row, callable);
- } catch (ServiceException e) {
- e.printStackTrace();
- throw new IOException();
} catch (Throwable t) {
- t.printStackTrace();
- throw new IOException();
+ throw new IOException("ERROR while calling coprocessor delete ",t);
}
Collection<SsccDeleteTransactionalResponse> results = result.values();
//GetTransactionalResponse[] resultArray = (GetTransactionalResponse[]) results.toArray();
@@ -342,8 +330,7 @@ public class SsccTransactionalTable extends HTable implements TransactionalTable
try {
result = super.coprocessorService(SsccRegionService.class, put.getRow(), put.getRow(), callable);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor put " + e);
+ throw new IOException("ERROR while calling coprocessor put ", e);
}
Collection<SsccPutTransactionalResponse> results = result.values();
SsccPutTransactionalResponse[] resultArray = new SsccPutTransactionalResponse[results.size()];
@@ -425,8 +412,7 @@ public class SsccTransactionalTable extends HTable implements TransactionalTable
try {
result = super.coprocessorService(SsccRegionService.class, delete.getRow(), delete.getRow(), callable);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor checkAndDelete");
+ throw new IOException("ERROR while calling coprocessor checkAndDelete", e);
}
Collection<SsccCheckAndDeleteResponse> results = result.values();
@@ -504,11 +490,7 @@ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, seting request startid: " + tr
try {
result = super.coprocessorService(SsccRegionService.class, put.getRow(), put.getRow(), callable);
} catch (Throwable e) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- //sw.toString();
- throw new IOException("ERROR while calling coprocessor checkAndPut" + sw.toString());
+ throw new IOException("ERROR while calling coprocessor checkAndPut", e);
}
Collection<SsccCheckAndPutResponse> results = result.values();
// Should only be one result, if more than one. Can't handle.
@@ -581,8 +563,7 @@ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, seting request startid: " + tr
entry.getValue().get(0).getRow(),
callable);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor delete");
+ throw new IOException("ERROR while calling coprocessor delete", e);
}
if(result.size() > 1) {
LOG.error("result size for multiple delete:" + result.size());
@@ -660,8 +641,7 @@ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, seting request startid: " + tr
entry.getValue().get(0).getRow(),
callable);
} catch (Throwable e) {
- e.printStackTrace();
- throw new IOException("ERROR while calling coprocessor put");
+ throw new IOException("ERROR while calling coprocessor put", e);
}
Collection<SsccPutMultipleTransactionalResponse> results = result.values();
SsccPutMultipleTransactionalResponse[] resultArray = new SsccPutMultipleTransactionalResponse[results.size()];
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/34abbbe7/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
index d0922e5..783b70c 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
@@ -103,7 +103,7 @@ public class TmDDL {
table = new HTable(config, tablename);
}
- public void putRow(final long transid, final String Operation, final String tableName) throws Exception {
+ public void putRow(final long transid, final String Operation, final String tableName) throws IOException {
long threadId = Thread.currentThread().getId();
if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + "Thread ID:" + threadId
@@ -116,14 +116,7 @@ public class TmDDL {
//Retrieve the row if it exists so we can append.
Get g = new Get(Bytes.toBytes(transid));
- try {
- r = table.get(g);
- }
- catch(Exception e){
- LOG.error("TmDDL putRow method, Get Exception TxID:" + transid + "Exception:" + e);
- throw e;
- }
-
+ r = table.get(g);
//Check and set State
if(! r.isEmpty())
{
@@ -216,24 +209,11 @@ public class TmDDL {
}
- try {
synchronized (tablePutLock) {
- try {
if (LOG.isTraceEnabled()) LOG.trace("TmDDL table.put, TxID: " + transid + "Put :" + p );
table.put(p);
- }
- catch (Exception e2){
- //Avoiding logging within a lock. Throwing Exception.
- throw e2;
- }
} // End global synchronization
- }
- catch (Exception e) {
- //create record of the exception
- LOG.error("TmDDL tablePutLock or Table.put Exception, TxID: " + transid + "Exception:" + e);
- throw e;
- }
if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow exit, TxId:" + transid);
}
[4/4] incubator-trafodion git commit: Merge [TRAFODION-1988] PR 479
Better Java exception handling in Trafodion
Posted by db...@apache.org.
Merge [TRAFODION-1988] PR 479 Better Java exception handling in Trafodion
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/33bea22b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/33bea22b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/33bea22b
Branch: refs/heads/master
Commit: 33bea22b19218a9af73a4ff8d97209e80762a6d3
Parents: 71edb09 34abbbe
Author: Dave Birdsall <db...@apache.org>
Authored: Wed May 18 18:04:34 2016 +0000
Committer: Dave Birdsall <db...@apache.org>
Committed: Wed May 18 18:04:34 2016 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/RMInterface.java | 43 +-
.../RetryTransactionException.java | 54 ++
.../RollbackUnsuccessfulException.java | 62 ++
.../transactional/SsccTransactionalTable.java | 34 +-
.../hbase/client/transactional/TmDDL.java | 24 +-
.../transactional/TransactionManager.java | 626 ++++++++-----------
.../TransactionalScanner.java.tmpl | 97 ++-
.../transactional/TransactionalTable.java | 46 +-
.../transactional/IdTmException.java | 14 +
.../java/org/trafodion/dtm/HBaseTxClient.java | 7 +-
.../java/org/trafodion/sql/ByteArrayList.java | 54 --
.../java/org/trafodion/sql/HBaseClient.java | 76 +--
.../java/org/trafodion/sql/HBulkLoadClient.java | 42 +-
.../java/org/trafodion/sql/HTableClient.java | 18 +-
.../main/java/org/trafodion/sql/HiveClient.java | 59 +-
.../java/org/trafodion/sql/ResultIterator.java | 133 ----
.../org/trafodion/sql/ResultKeyValueList.java | 100 ---
.../java/org/trafodion/sql/RowToInsert.java | 44 --
.../java/org/trafodion/sql/RowsToInsert.java | 57 --
.../org/trafodion/sql/SequenceFileWriter.java | 129 +---
.../java/org/trafodion/sql/StringArrayList.java | 47 --
21 files changed, 515 insertions(+), 1251 deletions(-)
----------------------------------------------------------------------