You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2015/10/02 18:16:33 UTC

[4/9] incubator-trafodion git commit: Most of the Trafodion Java source files are built through Maven, using projects DCS, REST, HBase-trx and SQL. A few files remain in the core/sql/executor and core/sql/ustat directories that are built through javac co

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a44823fe/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java b/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
new file mode 100644
index 0000000..36c4e05
--- /dev/null
+++ b/core/sql/src/main/java/org/trafodion/sql/HBaseClient.java
@@ -0,0 +1,1596 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.NavigableMap;
+import java.util.Map;
+import java.util.Arrays;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.transactional.RMInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.PoolMap;
+import org.apache.hadoop.hbase.util.PoolMap.PoolType;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
+//import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+//import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.BloomType; 
+//import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType ;
+import org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy;
+import org.apache.hadoop.hbase.client.Durability;
+import org.trafodion.sql.HTableClient;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.ServerName;
+
+import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.DtmConst;
+import org.apache.commons.codec.binary.Hex;
+
+import com.google.protobuf.ServiceException;
+
+public class HBaseClient {
+
+    static Logger logger = Logger.getLogger(HBaseClient.class.getName());
+    public static Configuration config = HBaseConfiguration.create();
+    String lastError;
+    RMInterface table = null;
+
+    private PoolMap<String, HTableClient> hTableClientsFree;
+    private PoolMap<String, HTableClient> hTableClientsInUse;
+    // this set of constants MUST be kept in sync with the C++ enum in
+    // ExpHbaseDefs.h
+    public static final int HBASE_NAME = 0;
+    public static final int HBASE_MAX_VERSIONS = 1;
+    public static final int HBASE_MIN_VERSIONS = 2;
+    public static final int HBASE_TTL = 3;
+    public static final int HBASE_BLOCKCACHE = 4;
+    public static final int HBASE_IN_MEMORY = 5;
+    public static final int HBASE_COMPRESSION = 6;
+    public static final int HBASE_BLOOMFILTER = 7;
+    public static final int HBASE_BLOCKSIZE = 8;
+    public static final int HBASE_DATA_BLOCK_ENCODING = 9;
+    public static final int HBASE_CACHE_BLOOMS_ON_WRITE = 10;
+    public static final int HBASE_CACHE_DATA_ON_WRITE = 11;
+    public static final int HBASE_CACHE_INDEXES_ON_WRITE = 12;
+    public static final int HBASE_COMPACT_COMPRESSION = 13;
+    public static final int HBASE_PREFIX_LENGTH_KEY = 14;
+    public static final int HBASE_EVICT_BLOCKS_ON_CLOSE = 15;
+    public static final int HBASE_KEEP_DELETED_CELLS = 16;
+    public static final int HBASE_REPLICATION_SCOPE = 17;
+    public static final int HBASE_MAX_FILESIZE = 18;
+    public static final int HBASE_COMPACT = 19;
+    public static final int HBASE_DURABILITY = 20;
+    public static final int HBASE_MEMSTORE_FLUSH_SIZE = 21;
+    public static final int HBASE_SPLIT_POLICY = 22;
+
+    
+    public HBaseClient() {
+      if (hTableClientsFree == null)
+         hTableClientsFree = new PoolMap<String, HTableClient>
+                 (PoolType.Reusable, Integer.MAX_VALUE);
+      hTableClientsInUse = new PoolMap<String, HTableClient>
+               (PoolType.Reusable, Integer.MAX_VALUE);
+    }
+
+    public String getLastError() {
+        return lastError;
+    }
+
+    void setLastError(String err) {
+        lastError = err;
+    }
+
+    static {
+    	//Some clients of this class e.g., DcsServer/JdbcT2 
+    	//want to use use their own log4j.properties file instead
+    	//of the /conf/lo4j.hdf.config so they can see their
+    	//log events in their own log files or console.
+    	//So, check for alternate log4j.properties otherwise
+    	//use the default HBaseClient config.
+    	String confFile = System.getProperty("hbaseclient.log4j.properties");
+    	if(confFile == null) {
+    		System.setProperty("trafodion.hdfs.log", System.getenv("MY_SQROOT") + "/logs/trafodion.hdfs.log");
+    		confFile = System.getenv("MY_SQROOT") + "/conf/log4j.hdfs.config";
+    	}
+    	PropertyConfigurator.configure(confFile);
+    }
+
+    public boolean init(String zkServers, String zkPort) 
+	throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException
+    {
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.init(" + zkServers + ", " + zkPort
+                         + ") called.");
+        HBaseAdmin.checkHBaseAvailable(config);
+
+        try {
+            table = new RMInterface();
+        } catch (Exception e) {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.init: Error in RMInterface instace creation.");
+        }
+        
+        return true;
+    }
+ 
+    private void  cleanup(PoolMap hTableClientsPool) throws IOException
+    {
+       Collection hTableClients;
+       Iterator<HTableClient> iter;
+       HTableClient htable;
+       boolean clearRegionCache = false;
+       boolean cleanJniObject = true;
+
+       hTableClients = hTableClientsPool.values();
+       iter = hTableClients.iterator();
+       while (iter.hasNext())
+       {
+         htable = iter.next();
+         htable.close(clearRegionCache, cleanJniObject);          
+       }
+       hTableClientsPool.clear();
+    }
+
+    public boolean cleanup() throws IOException {
+       cleanup(hTableClientsInUse);
+       cleanup(hTableClientsFree);
+       return true;
+    }
+
+   public void cleanupCache(Collection hTableClients) throws IOException
+    {
+       Iterator<HTableClient> iter;
+       HTableClient htable;
+       boolean clearRegionCache = true;
+       boolean cleanJniObject = false;
+ 
+       iter = hTableClients.iterator();
+       while (iter.hasNext())
+       {
+          htable = iter.next();
+          htable.close(clearRegionCache, cleanJniObject);     
+       }
+    }
+
+    public boolean cleanupCache(String tblName) throws IOException
+    {
+       Collection hTableClients;
+       hTableClients = hTableClientsFree.values(tblName);
+       cleanupCache(hTableClients);  
+       hTableClientsFree.remove(tblName);
+       hTableClients = hTableClientsInUse.values(tblName);
+       cleanupCache(hTableClients);  
+       hTableClientsInUse.remove(tblName);
+       return true;
+    }
+
+    public boolean create(String tblName, Object[]  colFamNameList,
+                          boolean isMVCC) 
+        throws IOException, MasterNotRunningException {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.create(" + tblName + ") called, and MVCC is " + isMVCC + ".");
+            cleanupCache(tblName);
+            HTableDescriptor desc = new HTableDescriptor(tblName);
+            for (int i = 0; i < colFamNameList.length ; i++) {
+		String  colFam = (String)colFamNameList[i];
+                HColumnDescriptor colDesc = new HColumnDescriptor(colFam);
+                if (isMVCC)
+                  colDesc.setMaxVersions(DtmConst.MVCC_MAX_VERSION);
+                else
+                  colDesc.setMaxVersions(DtmConst.SSCC_MAX_VERSION);
+                desc.addFamily(colDesc);
+            }
+            HColumnDescriptor metaColDesc = new HColumnDescriptor(DtmConst.TRANSACTION_META_FAMILY);
+            if (isMVCC)
+              metaColDesc.setMaxVersions(DtmConst.MVCC_MAX_DATA_VERSION);
+            else
+              metaColDesc.setMaxVersions(DtmConst.SSCC_MAX_DATA_VERSION);
+            metaColDesc.setInMemory(true);
+            desc.addFamily(metaColDesc);
+            HBaseAdmin admin = new HBaseAdmin(config);
+            admin.createTable(desc);
+            admin.close();
+            return true;
+   } 
+
+   // used for returning two flags from setDescriptors method
+
+   private class ChangeFlags {
+       boolean tableDescriptorChanged;
+       boolean columnDescriptorChanged;
+
+       ChangeFlags() {
+           tableDescriptorChanged = false;
+           columnDescriptorChanged = false;
+       }
+
+       void setTableDescriptorChanged() {
+           tableDescriptorChanged = true;
+       }
+
+       void setColumnDescriptorChanged() {
+           columnDescriptorChanged = true;
+       }
+
+       boolean tableDescriptorChanged() {
+           return tableDescriptorChanged;
+       }
+
+       boolean columnDescriptorChanged() {
+           return columnDescriptorChanged;
+       }
+   }
+
+   private ChangeFlags setDescriptors(Object[] tableOptions,
+                                      HTableDescriptor desc,
+                                      HColumnDescriptor colDesc,
+                                      int defaultVersionsValue) {
+       ChangeFlags returnStatus = new ChangeFlags();
+       String trueStr = "TRUE";
+       for (int i = 0; i < tableOptions.length; i++) {
+           if (i == HBASE_NAME)	
+               continue ;
+           String tableOption = (String)tableOptions[i];
+           if ((i != HBASE_MAX_VERSIONS) && (tableOption.isEmpty()))
+               continue ;
+           switch (i) {
+           case HBASE_MAX_VERSIONS:
+               if (tableOption.isEmpty()) {
+                   if (colDesc.getMaxVersions() != defaultVersionsValue) {
+                       colDesc.setMaxVersions(defaultVersionsValue);
+                       returnStatus.setColumnDescriptorChanged();
+                   }
+               }
+               else {
+                   colDesc.setMaxVersions
+                       (Integer.parseInt(tableOption));
+                   returnStatus.setColumnDescriptorChanged();
+               }
+               break ;
+           case HBASE_MIN_VERSIONS:
+               colDesc.setMinVersions
+                   (Integer.parseInt(tableOption));
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_TTL:
+               colDesc.setTimeToLive
+                   (Integer.parseInt(tableOption));
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_BLOCKCACHE:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setBlockCacheEnabled(true);
+               else
+                   colDesc.setBlockCacheEnabled(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_IN_MEMORY:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setInMemory(true);
+               else
+                   colDesc.setInMemory(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_COMPRESSION:
+               if (tableOption.equalsIgnoreCase("GZ"))
+                   colDesc.setCompressionType(Algorithm.GZ);
+               else if (tableOption.equalsIgnoreCase("LZ4"))
+                   colDesc.setCompressionType(Algorithm.LZ4);
+               else if (tableOption.equalsIgnoreCase("LZO"))
+                   colDesc.setCompressionType(Algorithm.LZO);
+               else if (tableOption.equalsIgnoreCase("NONE"))
+                   colDesc.setCompressionType(Algorithm.NONE);
+               else if (tableOption.equalsIgnoreCase("SNAPPY"))
+                   colDesc.setCompressionType(Algorithm.SNAPPY); 
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_BLOOMFILTER:
+               if (tableOption.equalsIgnoreCase("NONE"))
+                   colDesc.setBloomFilterType(BloomType.NONE);
+               else if (tableOption.equalsIgnoreCase("ROW"))
+                   colDesc.setBloomFilterType(BloomType.ROW);
+               else if (tableOption.equalsIgnoreCase("ROWCOL"))
+                   colDesc.setBloomFilterType(BloomType.ROWCOL); 
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_BLOCKSIZE:
+               colDesc.setBlocksize
+                   (Integer.parseInt(tableOption));
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_DATA_BLOCK_ENCODING:
+               if (tableOption.equalsIgnoreCase("DIFF"))
+                   colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
+               else if (tableOption.equalsIgnoreCase("FAST_DIFF"))
+                   colDesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+               else if (tableOption.equalsIgnoreCase("NONE"))
+                   colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
+               else if (tableOption.equalsIgnoreCase("PREFIX"))
+                   colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX);
+               else if (tableOption.equalsIgnoreCase("PREFIX_TREE"))
+                   colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_CACHE_BLOOMS_ON_WRITE:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setCacheBloomsOnWrite(true);
+               else
+                   colDesc.setCacheBloomsOnWrite(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_CACHE_DATA_ON_WRITE:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setCacheDataOnWrite(true);
+               else
+                   colDesc.setCacheDataOnWrite(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_CACHE_INDEXES_ON_WRITE:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setCacheIndexesOnWrite(true);
+               else
+                   colDesc.setCacheIndexesOnWrite(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_COMPACT_COMPRESSION:
+               if (tableOption.equalsIgnoreCase("GZ"))
+                   colDesc.setCompactionCompressionType(Algorithm.GZ);
+               else if (tableOption.equalsIgnoreCase("LZ4"))
+                   colDesc.setCompactionCompressionType(Algorithm.LZ4);
+               else if (tableOption.equalsIgnoreCase("LZO"))
+                   colDesc.setCompactionCompressionType(Algorithm.LZO);
+               else if (tableOption.equalsIgnoreCase("NONE"))
+                   colDesc.setCompactionCompressionType(Algorithm.NONE);
+               else if (tableOption.equalsIgnoreCase("SNAPPY"))
+                   colDesc.setCompactionCompressionType(Algorithm.SNAPPY); 
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_PREFIX_LENGTH_KEY:
+               desc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,
+                             tableOption);
+               returnStatus.setTableDescriptorChanged();
+               break ;
+           case HBASE_EVICT_BLOCKS_ON_CLOSE:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setEvictBlocksOnClose(true);
+               else
+                   colDesc.setEvictBlocksOnClose(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_KEEP_DELETED_CELLS:
+               if (tableOption.equalsIgnoreCase(trueStr))
+                   colDesc.setKeepDeletedCells(true);
+               else
+                   colDesc.setKeepDeletedCells(false);
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_REPLICATION_SCOPE:
+               colDesc.setScope
+                   (Integer.parseInt(tableOption));
+               returnStatus.setColumnDescriptorChanged();
+               break ;
+           case HBASE_MAX_FILESIZE:
+               desc.setMaxFileSize
+                   (Long.parseLong(tableOption));
+               returnStatus.setTableDescriptorChanged();
+               break ;
+           case HBASE_COMPACT:
+              if (tableOption.equalsIgnoreCase(trueStr))
+                   desc.setCompactionEnabled(true);
+               else
+                   desc.setCompactionEnabled(false); 
+               returnStatus.setTableDescriptorChanged();
+               break ;
+           case HBASE_DURABILITY:
+               if (tableOption.equalsIgnoreCase("ASYNC_WAL"))
+                   desc.setDurability(Durability.ASYNC_WAL);
+               else if (tableOption.equalsIgnoreCase("FSYNC_WAL"))
+                   desc.setDurability(Durability.FSYNC_WAL);
+               else if (tableOption.equalsIgnoreCase("SKIP_WAL"))
+                   desc.setDurability(Durability.SKIP_WAL);
+               else if (tableOption.equalsIgnoreCase("SYNC_WAL"))
+                   desc.setDurability(Durability.SYNC_WAL);
+               else if (tableOption.equalsIgnoreCase("USE_DEFAULT"))
+                   desc.setDurability(Durability.USE_DEFAULT);
+               returnStatus.setTableDescriptorChanged(); 
+               break ;
+           case HBASE_MEMSTORE_FLUSH_SIZE:
+               desc.setMemStoreFlushSize
+                   (Long.parseLong(tableOption));
+               returnStatus.setTableDescriptorChanged();
+               break ;
+           case HBASE_SPLIT_POLICY:
+               // This method not yet available in earlier versions
+               // desc.setRegionSplitPolicyClassName(tableOption));
+               desc.setValue(desc.SPLIT_POLICY, tableOption);
+               returnStatus.setTableDescriptorChanged();
+               break ;
+           default:
+               break;
+           }
+       }
+
+       return returnStatus;
+   }
+   
+
+   public boolean createk(String tblName, Object[] tableOptions,
+       Object[]  beginEndKeys, long transID, int numSplits, int keyLength,
+       boolean isMVCC)
+       throws IOException, MasterNotRunningException {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.createk(" + tblName + ") called.");
+            String trueStr = "TRUE";
+            cleanupCache(tblName);
+            HTableDescriptor desc = new HTableDescriptor(tblName);
+
+            int defaultVersionsValue = 0;
+            if (isMVCC)
+                defaultVersionsValue = DtmConst.MVCC_MAX_VERSION;
+            else
+                defaultVersionsValue = DtmConst.SSCC_MAX_VERSION;
+
+            // column family names are space delimited list of names.
+            // extract all family names and add to table descriptor.
+            // All other default and specified options remain the same for all families.
+            String colFamsStr = (String)tableOptions[HBASE_NAME];
+            String[] colFamsArr = colFamsStr.split("\\s+"); 
+
+            for (int i = 0; i < colFamsArr.length; i++){            
+                String colFam = colFamsArr[i];
+
+                HColumnDescriptor colDesc = new HColumnDescriptor(colFam);
+
+                // change the descriptors based on the tableOptions; 
+                setDescriptors(tableOptions,desc /*out*/,colDesc /*out*/, defaultVersionsValue);
+                
+                desc.addFamily(colDesc);
+            }
+
+            HColumnDescriptor metaColDesc = new HColumnDescriptor(DtmConst.TRANSACTION_META_FAMILY);
+            if (isMVCC)
+              metaColDesc.setMaxVersions(DtmConst.MVCC_MAX_DATA_VERSION);
+            else
+              metaColDesc.setMaxVersions(DtmConst.SSCC_MAX_DATA_VERSION);
+            metaColDesc.setInMemory(true);
+            desc.addFamily(metaColDesc);
+            HBaseAdmin admin = new HBaseAdmin(config);
+
+            try {
+               if (beginEndKeys != null && beginEndKeys.length > 0)
+               {
+                  byte[][] keys = new byte[beginEndKeys.length][];
+                  for (int i = 0; i < beginEndKeys.length; i++){
+                     keys[i] = (byte[])beginEndKeys[i]; 
+                     if (logger.isDebugEnabled()) logger.debug("HBaseClient.createk key #" + i + "value" + keys[i] + ") called.");
+                  }
+                  if (transID != 0) {
+                     table.createTable(desc, keys, numSplits, keyLength, transID);
+                     if (logger.isDebugEnabled()) logger.debug("HBaseClient.createk beginEndKeys(" + beginEndKeys + ") called.");
+                  } else {
+                     admin.createTable(desc, keys);
+                  }
+               }
+               else {
+                  if (transID != 0) {
+                     table.createTable(desc, null, numSplits, keyLength, transID);
+                  } else {
+                     admin.createTable(desc);
+                  }
+               }
+            }
+            catch (IOException e)
+            {
+               if (logger.isDebugEnabled()) logger.debug("HbaseClient.createk : createTable error" + e);
+               throw e;
+            }
+            admin.close();
+        return true;
+    }
+
+    public boolean registerTruncateOnAbort(String tblName, long transID)
+        throws MasterNotRunningException, IOException {
+
+        try {
+           if(transID != 0) {
+              table.truncateTableOnAbort(tblName, transID);
+           }
+        }
+        catch (IOException e) {
+           if (logger.isDebugEnabled()) logger.debug("HbaseClient.registerTruncateOnAbort error" + e);
+           throw e;
+        }
+        return true;
+    }
+
+    private void waitForCompletion(String tblName,HBaseAdmin admin) 
+        throws IOException {
+        // poll for completion of an asynchronous operation
+        boolean keepPolling = true;
+        while (keepPolling) {
+            // status.getFirst() returns the number of regions yet to be updated
+            // status.getSecond() returns the total number of regions
+            Pair<Integer,Integer> status = admin.getAlterStatus(tblName.getBytes());
+
+            keepPolling = (status.getFirst() > 0) && (status.getSecond() > 0);
+            if (keepPolling) {
+                try {
+                    Thread.sleep(2000);  // sleep two seconds or until interrupted
+                }
+                catch (InterruptedException e) {
+                    // ignore the interruption and keep going
+                }    
+            }
+        }
+    }
+
+    public boolean alter(String tblName, Object[] tableOptions, long transID)
+        throws IOException, MasterNotRunningException {
+
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.alter(" + tblName + ") called.");
+        cleanupCache(tblName);
+        HBaseAdmin admin = new HBaseAdmin(config);
+        HTableDescriptor htblDesc = admin.getTableDescriptor(tblName.getBytes());       
+        HColumnDescriptor[] families = htblDesc.getColumnFamilies();
+
+        String colFam = (String)tableOptions[HBASE_NAME];
+        if (colFam == null)
+            return true; // must have col fam name
+
+        // if the only option specified is col fam name and this family doesnt already
+        // exist, then add it.
+        boolean onlyColFamOptionSpecified = true;
+        for (int i = 0; (onlyColFamOptionSpecified && (i < tableOptions.length)); i++) {
+            if (i == HBASE_NAME)	
+                continue ;
+
+            if (((String)tableOptions[i]).length() != 0)
+                {
+                    onlyColFamOptionSpecified = false;
+                }
+        }
+
+        HColumnDescriptor colDesc = htblDesc.getFamily(colFam.getBytes());
+
+        ChangeFlags status = new ChangeFlags();
+        if (onlyColFamOptionSpecified) {
+            if (colDesc == null) {
+                colDesc = new HColumnDescriptor(colFam);
+                
+                htblDesc.addFamily(colDesc);
+                
+                status.setTableDescriptorChanged();
+            } else
+                return true; // col fam already exists
+        }
+        else {
+            if (colDesc == null)
+                return true; // colDesc must exist
+
+            int defaultVersionsValue = colDesc.getMaxVersions(); 
+
+            status = 
+                setDescriptors(tableOptions,htblDesc /*out*/,colDesc /*out*/, defaultVersionsValue);
+        }
+
+        try {
+            if (transID != 0) {
+                // Transactional alter support
+                table.alter(tblName, tableOptions, transID);
+                if (logger.isDebugEnabled()) logger.debug("HBaseClient.alter(" + tblName + ") called with object length: " + java.lang.reflect.Array.getLength(tableOptions));
+            }
+            else {
+                // the modifyTable and modifyColumn operations are asynchronous,
+                // so we have to have additional code to poll for their completion
+                // (I hear that synchronous versions will be available in HBase 1.x)
+                if (status.tableDescriptorChanged()) {
+                    admin.modifyTable(tblName,htblDesc);
+                    waitForCompletion(tblName,admin);
+                }
+                else if (status.columnDescriptorChanged()) {
+                    admin.modifyColumn(tblName,colDesc);                  
+                    waitForCompletion(tblName,admin);
+                }
+                admin.close();
+            }
+        }
+        catch (IOException e) {
+            if (logger.isDebugEnabled()) logger.debug("HbaseClient.drop  error" + e);
+            throw e;
+        }
+
+        cleanupCache(tblName);
+        return true;
+    }
+
+    public boolean drop(String tblName, long transID)
+             throws MasterNotRunningException, IOException {
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.drop(" + tblName + ") called.");
+        HBaseAdmin admin = new HBaseAdmin(config);
+        //			admin.disableTableAsync(tblName);
+
+        try {
+           if(transID != 0) {
+              table.dropTable(tblName, transID);
+           }
+           else {
+              admin.disableTable(tblName);
+              admin.deleteTable(tblName);
+              admin.close();
+           }
+        }
+        catch (IOException e) {
+           if (logger.isDebugEnabled()) logger.debug("HbaseClient.drop  error" + e);
+           throw e;
+        }
+
+        return cleanupCache(tblName);
+    }
+
+    public boolean dropAll(String pattern) 
+             throws MasterNotRunningException, IOException {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.dropAll(" + pattern + ") called.");
+            HBaseAdmin admin = new HBaseAdmin(config);
+
+	    HTableDescriptor[] htdl = admin.listTables(pattern);
+	    if (htdl == null) // no tables match the given pattern.
+		return true;
+
+	    for (HTableDescriptor htd : htdl) {
+		String tblName = htd.getNameAsString();
+
+                // do not drop DTM log files which have the format: TRAFODION._DTM_.*
+                int idx = tblName.indexOf("TRAFODION._DTM_");
+                if (idx == 0)
+                    continue;
+                
+                //                System.out.println(tblName);
+                admin.disableTable(tblName);
+                admin.deleteTable(tblName);
+	    }
+ 	    
+            admin.close();
+            return cleanup();
+    }
+
+    public ByteArrayList listAll(String pattern) 
+             throws MasterNotRunningException, IOException {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.listAll(" + pattern + ") called.");
+            HBaseAdmin admin = new HBaseAdmin(config);
+
+            ByteArrayList hbaseTables = new ByteArrayList();
+
+	    HTableDescriptor[] htdl = 
+                (pattern.isEmpty() ? admin.listTables() : admin.listTables(pattern));
+
+	    for (HTableDescriptor htd : htdl) {
+		String tblName = htd.getNameAsString();
+
+                //                System.out.println(tblName);
+
+                byte[] b = tblName.getBytes();
+                hbaseTables.add(b);
+	    }
+ 	    
+            admin.close();
+            cleanup();
+            
+            return hbaseTables;
+    }
+
+    public boolean copy(String currTblName, String oldTblName)
+	throws MasterNotRunningException, IOException, SnapshotCreationException, InterruptedException {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.copy(" + currTblName + oldTblName + ") called.");
+            HBaseAdmin admin = new HBaseAdmin(config);
+	    
+	    String snapshotName = currTblName + "_SNAPSHOT";
+	    
+	    List<SnapshotDescription> l = new ArrayList<SnapshotDescription>(); 
+	    //	    l = admin.listSnapshots(snapshotName);
+	    l = admin.listSnapshots();
+	    if (! l.isEmpty())
+		{
+		    for (SnapshotDescription sd : l) {
+			//			System.out.println("here 1");
+			//			System.out.println(snapshotName);
+			//			System.out.println(sd.getName());
+			if (sd.getName().compareTo(snapshotName) == 0)
+			    {
+				//				System.out.println("here 2");
+				//			    admin.enableTable(snapshotName);
+				//				System.out.println("here 3");
+				admin.deleteSnapshot(snapshotName);
+				//				System.out.println("here 4");
+			    }
+		    }
+		}
+	    //	    System.out.println(snapshotName);
+	    if (! admin.isTableDisabled(currTblName))
+		admin.disableTable(currTblName);
+	    //	    System.out.println("here 5");
+	    admin.snapshot(snapshotName, currTblName);
+	    admin.cloneSnapshot(snapshotName, oldTblName);
+	    admin.deleteSnapshot(snapshotName);
+	    //	    System.out.println("here 6");
+	    admin.enableTable(currTblName);
+            admin.close();
+            return true;
+    }
+
+    public boolean exists(String tblName)  
+           throws MasterNotRunningException, IOException {
+            if (logger.isDebugEnabled()) logger.debug("HBaseClient.exists(" + tblName + ") called.");
+            HBaseAdmin admin = new HBaseAdmin(config);
+            boolean result = admin.tableExists(tblName);
+            admin.close();
+            return result;
+    }
+
+    public HTableClient getHTableClient(long jniObject, String tblName, 
+                  boolean useTRex) throws IOException 
+    {
+       if (logger.isDebugEnabled()) logger.debug("HBaseClient.getHTableClient(" + tblName
+                         + (useTRex ? ", use TRX" : ", no TRX") + ") called.");
+       HTableClient htable = hTableClientsFree.get(tblName);
+       if (htable == null) {
+          htable = new HTableClient();
+          if (htable.init(tblName, useTRex) == false) {
+             if (logger.isDebugEnabled()) logger.debug("  ==> Error in init(), returning empty.");
+             return null;
+          }
+          if (logger.isDebugEnabled()) logger.debug("  ==> Created new object.");
+          hTableClientsInUse.put(htable.getTableName(), htable);
+          htable.setJniObject(jniObject);
+          return htable;
+       } else {
+            if (logger.isDebugEnabled()) logger.debug("  ==> Returning existing object, removing from container.");
+            hTableClientsInUse.put(htable.getTableName(), htable);
+            htable.resetAutoFlush();
+           htable.setJniObject(jniObject);
+            return htable;
+       }
+    }
+
+
+    public void releaseHTableClient(HTableClient htable) 
+                    throws IOException {
+        if (htable == null)
+            return;
+	                
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.releaseHTableClient(" + htable.getTableName() + ").");
+        boolean cleanJniObject = false;
+        if (htable.release(cleanJniObject))
+        // If the thread is interrupted, then remove the table from cache
+        // because the table connection is retried when the table is used
+        // next time
+
+           cleanupCache(htable.getTableName());
+        else
+        {
+           if (hTableClientsInUse.remove(htable.getTableName(), htable))
+              hTableClientsFree.put(htable.getTableName(), htable);
+           else
+              if (logger.isDebugEnabled()) logger.debug("Table not found in inUse Pool");
+        }
+    }
+
+    public boolean flushAllTables() throws IOException {
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.flushAllTables() called.");
+       if (hTableClientsInUse.isEmpty()) {
+          return true;
+        }
+        for (HTableClient htable : hTableClientsInUse.values()) {
+		  htable.flush();
+        }
+	return true; 
+    }
+
+    public boolean grant(byte[] user, byte[] tblName,
+                         Object[] actionCodes) throws IOException {
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.grant(" + new String(user) + ", "
+                     + new String(tblName) + ") called.");
+		byte[] colFamily = null;
+
+		Permission.Action[] assigned = new Permission.Action[actionCodes.length];
+		for (int i = 0 ; i < actionCodes.length; i++) {
+			String actionCode = (String)actionCodes[i];
+			assigned[i] = Permission.Action.valueOf(actionCode);
+		}
+
+	    //HB98
+	    TableName htblName = TableName.valueOf(new String(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME)
+						   ,new String(tblName));
+            UserPermission userPerm = new UserPermission(user, htblName,
+                                                         colFamily, assigned);
+
+            AccessController accessController = new AccessController();
+	    //HB98 The grant() method is very different in HB98 (commenting out for now)
+            //accessController.grant(userPerm);
+        return true;
+    }
+
+   public boolean revoke(byte[] user, byte[] tblName,
+                          Object[] actionCodes) 
+                     throws IOException {
+        if (logger.isDebugEnabled()) logger.debug("HBaseClient.revoke(" + new String(user) + ", "
+                     + new String(tblName) + ") called.");
+        byte[] colFamily = null;
+
+        Permission.Action[] assigned = new Permission.Action[actionCodes.length];
+        for (int i = 0 ; i < actionCodes.length; i++) {
+            String actionCode = (String)actionCodes[i];
+            assigned[i] = Permission.Action.valueOf(actionCode);
+        }
+
+	    //HB98
+	    TableName htblName = TableName.valueOf(new String(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME)
+						   ,new String(tblName));
+            UserPermission userPerm = new UserPermission(user, htblName,
+                                                         colFamily, assigned);
+
+            AccessController accessController = new AccessController();
+	    
+	    //HB98 The revoke() method is very different in HB98 (commenting out for now)
+            //accessController.revoke(userPerm);
+        return true;
+    }
+
+    // Debugging method to display initial set of KeyValues and sequence
+    // of column qualifiers.
+    private void printQualifiers(HFile.Reader reader, int maxKeys) 
+                 throws IOException {
+      String qualifiers = new String();
+      HFileScanner scanner = reader.getScanner(false, false, false);
+      scanner.seekTo();
+      int kvCount = 0;
+      int nonPuts = 0;
+      do {
+        KeyValue kv = scanner.getKeyValue();
+        System.out.println(kv.toString());
+        if (kv.getType() == KeyValue.Type.Put.getCode())
+          qualifiers = qualifiers + kv.getQualifier()[0] + " ";
+        else
+          nonPuts++;
+      } while (++kvCount < maxKeys && scanner.next());
+      System.out.println("First " + kvCount + " column qualifiers: " + qualifiers);
+      if (nonPuts > 0)
+        System.out.println("Encountered " + nonPuts + " non-PUT KeyValue types.");
+    }
+
+    // Estimates the number of rows still in the MemStores of the regions
+    // associated with the passed table name. The number of bytes in the
+    // MemStores is divided by the passed row size in bytes, which is
+    // derived by comparing the row count for an HFile (which in turn is
+    // derived by the number of KeyValues in the file and the number of
+    // columns in the table) to the size of the HFile.
+    private long estimateMemStoreRows(String tblName, int rowSize)
+                 throws MasterNotRunningException, IOException {
+      if (rowSize == 0)
+        return 0;
+
+      HBaseAdmin admin = new HBaseAdmin(config);
+      HTable htbl = new HTable(config, tblName);
+      long totalMemStoreBytes = 0;
+      try {
+        // Get a set of all the regions for the table.
+        Set<HRegionInfo> tableRegionInfos = htbl.getRegionLocations().keySet();
+        Set tableRegions = new TreeSet(Bytes.BYTES_COMPARATOR);
+        for (HRegionInfo regionInfo : tableRegionInfos) {
+          tableRegions.add(regionInfo.getRegionName());
+        }
+     
+        // Get collection of all servers in the cluster.
+        ClusterStatus clusterStatus = admin.getClusterStatus();
+        Collection<ServerName> servers = clusterStatus.getServers();
+        final long bytesPerMeg = 1024L * 1024L;
+     
+        // For each server, look at each region it contains and see if 
+        // it is in the set of regions for the table. If so, add the
+        // size of its the running total.
+        for (ServerName serverName : servers) {
+          ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+          for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
+            byte[] regionId = regionLoad.getName();
+            if (tableRegions.contains(regionId)) {
+              long regionMemStoreBytes = bytesPerMeg * regionLoad.getMemStoreSizeMB();
+              if (logger.isDebugEnabled()) logger.debug("Region " + regionLoad.getNameAsString()
+                           + " has MemStore size " + regionMemStoreBytes);
+              totalMemStoreBytes += regionMemStoreBytes;
+            }
+          }
+        }
+      }
+      finally {
+        admin.close();
+      }
+
+      // Divide the total MemStore size by the size of a single row.
+      if (logger.isDebugEnabled()) logger.debug("Estimating " + (totalMemStoreBytes / rowSize)
+                   + " rows in MemStores of table's regions.");
+      return totalMemStoreBytes / rowSize;
+    }
+
+
+    public float getBlockCacheFraction()
+    {
+        float defCacheFraction = 0.4f;
+        return config.getFloat("hfile.block.cache.size",defCacheFraction);
+    }
+    // Estimates row count for tblName by iterating over the HFiles for
+    // the table, extracting the KeyValue entry count from the file's
+    // trailer block, summing the counts, and dividing by the number of
+    // columns in the table. An adjustment is made for the estimated
+    // number of missing (null) values by sampling the first several
+    // hundred KeyValues to see how many are missing.
+    public boolean estimateRowCount(String tblName, int partialRowSize,
+                                    int numCols, long[] rc)
+                   throws MasterNotRunningException, IOException, ClassNotFoundException, URISyntaxException {
+      if (logger.isDebugEnabled()) logger.debug("HBaseClient.estimateRowCount(" + tblName + ") called.");
+
+      final String REGION_NAME_PATTERN = "[0-9a-f]*";
+      final String HFILE_NAME_PATTERN  = "[0-9a-f]*";
+
+      // To estimate incidence of nulls, read the first 500 rows worth
+      // of KeyValues.
+      final int ROWS_TO_SAMPLE = 500;
+      int putKVsSampled = 0;
+      int nonPutKVsSampled = 0;
+      int nullCount = 0;
+      long totalEntries = 0;   // KeyValues in all HFiles for table
+      long totalSizeBytes = 0; // Size of all HFiles for table 
+      long estimatedTotalPuts = 0;
+      boolean more = true;
+
+      // Access the file system to go directly to the table's HFiles.
+      // Create a reader for the file to access the entry count stored
+      // in the trailer block, and a scanner to iterate over a few
+      // hundred KeyValues to estimate the incidence of nulls.
+      long nano1, nano2;
+      nano1 = System.nanoTime();
+      FileSystem fileSystem = FileSystem.get(config);
+      nano2 = System.nanoTime();
+      if (logger.isDebugEnabled()) logger.debug("FileSystem.get() took " + ((nano2 - nano1) + 500000) / 1000000 + " milliseconds.");
+      CacheConfig cacheConf = new CacheConfig(config);
+      String hbaseRootPath = config.get(HConstants.HBASE_DIR).trim();
+      if (hbaseRootPath.charAt(0) != '/')
+        hbaseRootPath = new URI(hbaseRootPath).getPath();
+      if (logger.isDebugEnabled()) logger.debug("hbaseRootPath = " + hbaseRootPath);
+      FileStatus[] fsArr = fileSystem.globStatus(new Path(
+                               hbaseRootPath + "/data/default/" +
+                               tblName + "/" + REGION_NAME_PATTERN +
+                               "/#1/" + HFILE_NAME_PATTERN));
+      for (FileStatus fs : fsArr) {
+        // Make sure the file name conforms to HFile name pattern.
+        if (!StoreFileInfo.isHFile(fs.getPath())) {
+          if (logger.isDebugEnabled()) logger.debug("Skipped file " + fs.getPath() + " -- not a valid HFile name.");
+          continue;
+        }
+        HFile.Reader reader = HFile.createReader(fileSystem, fs.getPath(), cacheConf, config);
+        try {
+          totalEntries += reader.getEntries();
+          totalSizeBytes += reader.length();
+          //printQualifiers(reader, 100);
+          if (ROWS_TO_SAMPLE > 0 &&
+              totalEntries == reader.getEntries()) {  // first file only
+            // Trafodion column qualifiers are ordinal numbers, which
+            // makes it easy to count missing (null) values. We also count
+            // the non-Put KVs (typically delete-row markers) to estimate
+            // their frequency in the full file set.
+            HFileScanner scanner = reader.getScanner(false, false, false);
+            scanner.seekTo();  //position at beginning of first data block
+            byte currQual = 0;
+            byte nextQual;
+            do {
+              KeyValue kv = scanner.getKeyValue();
+              if (kv.getType() == KeyValue.Type.Put.getCode()) {
+                nextQual = kv.getQualifier()[0];
+                if (nextQual <= currQual)
+                  nullCount += ((numCols - currQual)  // nulls at end of this row
+                              + (nextQual - 1));      // nulls at start of next row
+                else
+                  nullCount += (nextQual - currQual - 1);
+                currQual = nextQual;
+                putKVsSampled++;
+              } else {
+                nonPutKVsSampled++;  // don't count these toward the number
+              }                      //   we want to scan
+            } while ((putKVsSampled + nullCount) < (numCols * ROWS_TO_SAMPLE)
+                     && (more = scanner.next()));
+
+            // If all rows were read, count any nulls at end of last row.
+            if (!more && putKVsSampled > 0)
+              nullCount += (numCols - currQual);
+
+            if (logger.isDebugEnabled()) logger.debug("Sampled " + nullCount + " nulls.");
+          }  // code for first file
+        } finally {
+          reader.close(false);
+        }
+      } // for
+
+      long estimatedEntries = (ROWS_TO_SAMPLE > 0
+                                 ? 0               // get from sample data, below
+                                 : totalEntries);  // no sampling, use stored value
+      if (putKVsSampled > 0) // avoid div by 0 if no Put KVs in sample
+        {
+          estimatedTotalPuts = (putKVsSampled * totalEntries) / 
+                               (putKVsSampled + nonPutKVsSampled);
+          estimatedEntries = ((putKVsSampled + nullCount) * estimatedTotalPuts)
+                                   / putKVsSampled;
+        }
+
+      // Calculate estimate of rows in all HFiles of table.
+      rc[0] = (estimatedEntries + (numCols/2)) / numCols; // round instead of truncate
+
+      // Estimate # of rows in MemStores of all regions of table. Pass
+      // a value to divide the size of the MemStore by. Base this on the
+      // ratio of bytes-to-rows in the HFiles, or the actual row size if
+      // the HFiles were empty.
+      int rowSize;
+      if (rc[0] > 0)
+        rowSize = (int)(totalSizeBytes / rc[0]);
+      else {
+        // From Traf metadata we have calculated and passed in part of the row
+        // size, including size of column qualifiers (col names), which are not
+        // known to HBase.  Add to this the length of the fixed part of the
+        // KeyValue format, times the number of columns.
+        int fixedSizePartOfKV = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE // key len + value len
+                              + KeyValue.KEY_INFRASTRUCTURE_SIZE;     // rowkey & col family len, timestamp, key type
+        rowSize = partialRowSize   // for all cols: row key + col qualifiers + values
+                      + (fixedSizePartOfKV * numCols);
+
+        // Trafodion tables have a single col family at present, so we only look
+        // at the first family name, and multiply its length times the number of
+        // columns. Even if more than one family is used in the future, presumably
+        // they will all be the same short size.
+        HTable htbl = new HTable(config, tblName);
+        HTableDescriptor htblDesc = htbl.getTableDescriptor();
+        HColumnDescriptor[] families = htblDesc.getColumnFamilies();
+        rowSize += (families[0].getName().length * numCols);
+      }
+
+      // Get the estimate of MemStore rows. Add to total after logging
+      // of individual sums below.
+      long memStoreRows = estimateMemStoreRows(tblName, rowSize);
+
+      if (logger.isDebugEnabled()) logger.debug(tblName + " contains a total of " + totalEntries + " KeyValues in all HFiles.");
+      if (logger.isDebugEnabled()) logger.debug("Based on a sample, it is estimated that " + estimatedTotalPuts +
+                   " of these KeyValues are of type Put.");
+      if (putKVsSampled + nullCount > 0)
+        if (logger.isDebugEnabled()) logger.debug("Sampling indicates a null incidence of " + 
+                     (nullCount * 100)/(putKVsSampled + nullCount) +
+                     " percent.");
+      if (logger.isDebugEnabled()) logger.debug("Estimated number of actual values (including nulls) is " + estimatedEntries);
+      if (logger.isDebugEnabled()) logger.debug("Estimated row count in HFiles = " + estimatedEntries +
+                   " / " + numCols + " (# columns) = " + rc[0]);
+      if (logger.isDebugEnabled()) logger.debug("Estimated row count from MemStores = " + memStoreRows);
+
+      rc[0] += memStoreRows;  // Add memstore estimate to total
+      if (logger.isDebugEnabled()) logger.debug("Total estimated row count for " + tblName + " = " + rc[0]);
+      return true;
+    }
+
+
+    /**
+    This method returns node names where Hbase Table regions reside
+    **/
+    public boolean getRegionsNodeName(String tblName, String[] nodeNames)
+                   throws IOException
+    {
+      if (logger.isDebugEnabled()) 
+        logger.debug("HBaseClient.getRegionsNodeName(" + tblName + ") called.");
+
+      HRegionInfo regInfo = null;
+
+
+      HTable htbl = new HTable(config, tblName);
+      if (logger.isDebugEnabled())
+         logger.debug("after HTable call in getRegionsNodeName");
+
+      try {
+        NavigableMap<HRegionInfo, ServerName> locations = htbl.getRegionLocations();
+        if (logger.isDebugEnabled())
+           logger.debug("after htable.getRegionLocations call in getRegionsNodeName");
+
+      
+        String hostName;
+        int regCount = 0;
+
+        for (Map.Entry<HRegionInfo, ServerName> entry: locations.entrySet()) {
+          if (logger.isDebugEnabled()) logger.debug("Entered for loop in getRegionsNodeName");
+          regInfo = entry.getKey();
+          hostName = entry.getValue().getHostname();
+          nodeNames[regCount] = hostName;
+          if (logger.isDebugEnabled()) logger.debug("Hostname for region " + regCount + " is " + hostName);
+          regCount++;
+        }
+      } catch (Exception ie) {
+        if (logger.isDebugEnabled())
+          logger.debug("getRegionLocations throws exception " + ie.getMessage());
+        return false;
+      }
+
+      return true;
+    }
+
+
+
+    /**
+    This method returns index levels and block size of Hbase Table.
+    Index level is read from  Hfiles trailer block. Randomly selects one region and iterates through all Hfiles
+    in the chosen region and gets the maximum index level.
+    Block size is read from HColumnDescriptor.
+    **/
+    public boolean getHbaseTableInfo(String tblName, int[] tblInfo)
+                   throws MasterNotRunningException, IOException, ClassNotFoundException, URISyntaxException {
+
+      if (logger.isDebugEnabled()) logger.debug("HBaseClient.getHbaseTableInfo(" + tblName + ") called.");
+      final String REGION_NAME_PATTERN = "[0-9a-f]*";
+      final String HFILE_NAME_PATTERN  = "[0-9a-f]*";
+
+      // initialize 
+      int indexLevel = 0;
+      int currIndLevel = 0;
+      int blockSize = 0;
+      tblInfo[0] = indexLevel;
+      tblInfo[1] = blockSize;
+
+      // get block size
+      HTable htbl = new HTable(config, tblName);
+      HTableDescriptor htblDesc = htbl.getTableDescriptor();
+      HColumnDescriptor[] families = htblDesc.getColumnFamilies();
+      blockSize = families[0].getBlocksize();
+      tblInfo[1] = blockSize;
+
+      // Access the file system to go directly to the table's HFiles.
+      long nano1 = 0, nano2 = 0;
+      if (logger.isDebugEnabled())
+        nano1 = System.nanoTime();
+      FileSystem fileSystem = FileSystem.get(config);
+
+      if (logger.isDebugEnabled()) {
+        nano2 = System.nanoTime();
+        logger.debug("FileSystem.get() took " + ((nano2 - nano1) + 500000) / 1000000 + " milliseconds.");
+      }
+      CacheConfig cacheConf = new CacheConfig(config);
+      String hbaseRootPath = config.get(HConstants.HBASE_DIR).trim();
+      if (hbaseRootPath.charAt(0) != '/')
+        hbaseRootPath = new URI(hbaseRootPath).getPath();
+      if (logger.isDebugEnabled()) logger.debug("hbaseRootPath = " + hbaseRootPath);
+
+      String regDir = hbaseRootPath + "/data/default/" + 
+                      tblName + "/" + REGION_NAME_PATTERN + "/#1";
+      if (logger.isDebugEnabled()) logger.debug("region dir = " + regDir);
+
+      //get random region from the list of regions and look at all Hfiles in that region
+      FileStatus[] regArr;
+      try {
+        regArr = fileSystem.globStatus(new Path(regDir));
+      } catch (IOException ioe) {
+        if (logger.isDebugEnabled()) logger.debug("fs.globStatus on region throws IOException");
+        return false; // return index level = 0; and  block size
+      }
+      
+      // logging
+      if (logger.isDebugEnabled()) {
+        for (int i =0; i < regArr.length; i++) 
+          logger.debug("Region Path is " + regArr[i].getPath());
+      }
+      // get random region from the region array
+      int regInd = 0;
+      regInd = tblName.hashCode() % regArr.length;
+
+      Path regName = regArr[regInd].getPath();
+      // extract MD5 hash name of random region from its path including colFam name. 
+      // we just need part2 and looks something like /c8fe2d575de62d5d5ffc530bda497bca/#1
+      String strRegPath = regName.toString();
+      String parts[] = strRegPath.split(tblName);
+      String part2 = parts[1];
+
+      // now remove regular expression from the region path.
+      // would look something like /hbase/data/default/<cat.sch.tab>/[0-9a-f]*/#1
+      int j = regDir.indexOf("/[");
+      String regPrefix = regDir.substring(0,j);
+      if (logger.isDebugEnabled()) logger.debug("Region Path prefix = " + regPrefix);
+      String hfilePath = regPrefix + part2 + "/" + HFILE_NAME_PATTERN;
+      
+      if (logger.isDebugEnabled()) logger.debug("Random = " + regInd + ", region is " + regName);
+      if (logger.isDebugEnabled()) logger.debug("Hfile path = " + hfilePath);
+
+      FileStatus[] fsArr;
+      try {
+        fsArr = fileSystem.globStatus(new Path(hfilePath));
+      } catch (IOException ioe) {
+        if (logger.isDebugEnabled()) logger.debug("fs.globStatus on Hfile throws IOException");
+        return false; // return index level = 0; and  block size
+      }
+
+      if (logger.isDebugEnabled()) {
+        for (int i =0; i < fsArr.length; i++)
+          logger.debug("Hfile Path is " + fsArr[i].getPath());
+      }
+     
+      // no Hfiles return from here
+      if (fsArr.length == 0)
+        return true; // return index level = 0; and  block size
+
+      // get maximum index level going through all Hfiles of randomly chosen region
+      if (logger.isDebugEnabled())
+        nano1 = System.nanoTime();
+      for (FileStatus fs : fsArr) {
+        // Make sure the file name conforms to HFile name pattern.
+        if (!StoreFileInfo.isHFile(fs.getPath())) {
+          if (logger.isDebugEnabled()) logger.debug("Skipped file " + fs.getPath() + " -- not a valid HFile name.");
+          continue;
+        }
+
+        // Create a reader for the file to access the index levels stored
+        // in the trailer block
+        HFile.Reader reader = HFile.createReader(fileSystem, fs.getPath(), cacheConf, config);
+        try {
+          FixedFileTrailer trailer = reader.getTrailer();
+          currIndLevel = trailer.getNumDataIndexLevels();
+          // index levels also include data block, should be excluded.
+          if (currIndLevel > 0)
+            currIndLevel = currIndLevel - 1;
+          if (logger.isDebugEnabled()) 
+            logger.debug("currIndLevel = " + currIndLevel+ ", indexLevel = " + indexLevel);
+          if (currIndLevel > indexLevel)
+            indexLevel = currIndLevel;
+       } finally {
+         reader.close(false);
+       }
+      } // for
+
+      if (logger.isDebugEnabled()) {
+        nano2 = System.nanoTime();
+        logger.debug("get index level took " + ((nano2 - nano1) + 500000) / 1000000 + " milliseconds.");
+      }
+
+      tblInfo[0] = indexLevel;
+      if (logger.isDebugEnabled()) {
+        logger.debug("Index Levels for " + tblName + " = " + tblInfo[0]);
+        logger.debug("Block Size for " + tblName + " = " + tblInfo[1]);
+      }
+      
+      return true;
+    }
+
+    void printCell(KeyValue kv) {
+        String rowID = new String(kv.getRow());
+        String colFamily = new String(kv.getFamily());
+        String colName = new String(kv.getQualifier());
+        String colValue = new String(kv.getValue());
+        String row = rowID + ", " + colFamily + ", " + colName + ", "
+            + colValue + ", " + kv.getTimestamp();
+        System.out.println(row);
+    }
+
+    
+  public  HBulkLoadClient getHBulkLoadClient() throws IOException 
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBaseClient.getHBulkLoadClient() called.");
+    HBulkLoadClient hblc = null;
+    try 
+    {
+       hblc = new HBulkLoadClient( config);
+    
+    if (hblc == null)
+      throw new IOException ("hbkc is null");
+    }
+    catch (IOException e)
+    {
+      return null;
+    }
+    
+    return hblc;
+    
+  }
+  public void releaseHBulkLoadClient(HBulkLoadClient hblc) 
+      throws IOException 
+  {
+     if (hblc == null)
+       return;
+          
+      if (logger.isDebugEnabled()) logger.debug("HBaseClient.releaseHBulkLoadClient().");
+      hblc.release();
+   }
+  
+  //returns the latest snapshot name for a table. returns null if table has no snapshots
+  //associated with it
+  public String getLatestSnapshot(String tabName) throws IOException
+  {
+    HBaseAdmin admin = new HBaseAdmin(config);
+    List<SnapshotDescription> snapDescs = admin.listSnapshots();
+    long maxTimeStamp = 0;
+    String latestsnpName = null;
+    for (SnapshotDescription snp :snapDescs )
+    {
+      if (snp.getTable().compareTo(tabName) == 0 && 
+          snp.getCreationTime() > maxTimeStamp)
+      {
+        latestsnpName= snp.getName();
+        maxTimeStamp = snp.getCreationTime();
+      }
+      
+    }
+    admin.close();
+    admin = null;
+    return latestsnpName;
+  }
+  public boolean cleanSnpScanTmpLocation(String pathStr) throws Exception
+  {
+    if (logger.isDebugEnabled()) logger.debug("HbaseClient.cleanSnpScanTmpLocation() - start - Path: " + pathStr);
+    try 
+    {
+      Path delPath = new Path(pathStr );
+      delPath = delPath.makeQualified(delPath.toUri(), null);
+      FileSystem fs = FileSystem.get(delPath.toUri(),config);
+      fs.delete(delPath, true);
+    }
+    catch (IOException e)
+    {
+      if (logger.isDebugEnabled()) logger.debug("HbaseClient.cleanSnpScanTmpLocation() --exception:" + e);
+      throw e;
+    }
+    
+    return true;
+  }
+  private boolean updatePermissionForEntries(FileStatus[] entries, String hbaseUser, FileSystem fs) throws IOException 
+  {
+    if (entries == null) {
+      return true;
+    }
+    
+    for (FileStatus child : entries) {
+      Path path = child.getPath();
+      List<AclEntry> lacl = AclEntry.parseAclSpec("user:" + hbaseUser + ":rwx", true) ;
+      try 
+      {
+        fs.modifyAclEntries(path, lacl);
+      }
+      catch (IOException e)
+      {
+        //if failure just log exception and continue
+        if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.updatePermissionForEntries() exception. " + e);
+      }
+      if (child.isDir()) 
+      {
+        FileStatus[] files = FSUtils.listStatus(fs,path);
+        updatePermissionForEntries(files,hbaseUser, fs);
+      } 
+    }
+    return true;
+  }
+  
+  public boolean setArchivePermissions( String tabName) throws IOException,ServiceException
+  {
+    if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.setArchivePermissions() called. ");
+    Path rootDir = FSUtils.getRootDir(config);
+    FileSystem myfs = FileSystem.get(rootDir.toUri(),config);
+    FileStatus fstatus = myfs.getFileStatus(rootDir);
+    String hbaseUser = fstatus.getOwner(); 
+    assert (hbaseUser != null && hbaseUser.length() != 0);
+    Path tabArcPath = HFileArchiveUtil.getTableArchivePath(config,  TableName.valueOf(tabName));
+    if (tabArcPath == null)
+      return true;
+    List<AclEntry> lacl = AclEntry.parseAclSpec("user:" + hbaseUser + ":rwx", true) ;
+    try
+    {
+      myfs.modifyAclEntries(tabArcPath, lacl);
+    }
+    catch (IOException e)
+    {
+      //if failure just log exception and continue
+      if (logger.isTraceEnabled()) logger.trace("[Snapshot Scan] SnapshotScanHelper.setArchivePermissions() exception. " + e);
+    }
+    FileStatus[] files = FSUtils.listStatus(myfs,tabArcPath);
+    updatePermissionForEntries(files,  hbaseUser, myfs); 
+    return true;
+  }
+
+  public int startGet(long jniObject, String tblName, boolean useTRex, long transID, byte[] rowID,
+                        Object[] columns, long timestamp)
+                        throws IOException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      return htc.startGet(transID, rowID, columns, timestamp);
+  }
+
+  public int startGet(long jniObject, String tblName, boolean useTRex, long transID, Object[] rowIDs,
+                        Object[] columns, long timestamp)
+                        throws IOException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      return htc.startGet(transID, rowIDs, columns, timestamp);
+  }
+
+  public int startGet(long jniObject, String tblName, boolean useTRex, long transID, short rowIDLen, Object rowIDs,
+                        Object[] columns)
+                        throws IOException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      return htc.getRows(transID, rowIDLen, rowIDs, columns);
+  }
+
+  public boolean insertRow(long jniObject, String tblName, boolean useTRex, long transID, byte[] rowID,
+                         Object row,
+                         long timestamp,
+                         boolean checkAndPut,
+                         boolean asyncOperation) throws IOException, InterruptedException, ExecutionException {
+
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      boolean ret = htc.putRow(transID, rowID, row, null, null,
+                                checkAndPut, asyncOperation);
+      if (asyncOperation == true)
+         htc.setJavaObject(jniObject);
+      else
+         releaseHTableClient(htc);
+      return ret;
+  }
+
+  public boolean checkAndUpdateRow(long jniObject, String tblName, boolean useTRex, long transID, byte[] rowID,
+                         Object columnsToUpdate,
+                         byte[] columnToCheck, byte[] columnValToCheck,
+                         long timestamp,
+                         boolean asyncOperation) throws IOException, InterruptedException, ExecutionException {
+      boolean checkAndPut = true;
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      boolean ret = htc.putRow(transID, rowID, columnsToUpdate, columnToCheck, columnValToCheck,
+                                checkAndPut, asyncOperation);
+      if (asyncOperation == true)
+         htc.setJavaObject(jniObject);
+      else
+         releaseHTableClient(htc);
+      return ret;
+  }
+
+  public boolean insertRows(long jniObject, String tblName, boolean useTRex, long transID, 
+			 short rowIDLen,
+                         Object rowIDs,
+                         Object rows,
+                         long timestamp,
+                         boolean autoFlush,
+                         boolean asyncOperation) throws IOException, InterruptedException, ExecutionException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      boolean ret = htc.putRows(transID, rowIDLen, rowIDs, rows, timestamp, autoFlush, asyncOperation);
+      if (asyncOperation == true)
+         htc.setJavaObject(jniObject);
+      else
+         releaseHTableClient(htc);
+      return ret;
+  }
+
+  public boolean deleteRow(long jniObject, String tblName, boolean useTRex, long transID, 
+                                 byte[] rowID,
+                                 Object[] columns,
+                                 long timestamp, boolean asyncOperation) throws IOException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      boolean ret = htc.deleteRow(transID, rowID, columns, timestamp, asyncOperation);
+      if (asyncOperation == true)
+         htc.setJavaObject(jniObject);
+      else
+         releaseHTableClient(htc);
+      return ret;
+  }
+
+  public boolean deleteRows(long jniObject, String tblName, boolean useTRex, long transID, short rowIDLen, Object rowIDs,
+                      long timestamp, 
+                      boolean asyncOperation) throws IOException, InterruptedException, ExecutionException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      boolean ret = htc.deleteRows(transID, rowIDLen, rowIDs, timestamp, asyncOperation);
+      if (asyncOperation == true)
+         htc.setJavaObject(jniObject);
+      else
+         releaseHTableClient(htc);
+      return ret;
+  }
+
+  public boolean checkAndDeleteRow(long jniObject, String tblName, boolean useTRex, long transID, 
+                                 byte[] rowID,
+                                 byte[] columnToCheck, byte[] colValToCheck,
+                                 long timestamp, boolean asyncOperation) throws IOException {
+      HTableClient htc = getHTableClient(jniObject, tblName, useTRex);
+      boolean ret = htc.checkAndDeleteRow(transID, rowID, columnToCheck, colValToCheck, timestamp);
+      if (asyncOperation == true)
+         htc.setJavaObject(jniObject);
+      else
+         releaseHTableClient(htc);
+      return ret;
+  }
+
+  public boolean  createCounterTable(String tabName,  String famName) throws IOException, MasterNotRunningException
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBaseClient.createCounterTable() - start");
+    HBaseAdmin admin = new HBaseAdmin(config);
+    TableName tn =  TableName.valueOf (tabName);
+    if (admin.tableExists(tabName)) {
+        admin.close();
+        return true;
+    }
+    HTableDescriptor desc = new HTableDescriptor(tn);
+    HColumnDescriptor colDesc = new HColumnDescriptor(famName);
+    // A counter table is non-DTM-transactional.
+    // Use the default maximum versions for MVCC.
+    colDesc.setMaxVersions(DtmConst.MVCC_MAX_VERSION);
+    desc.addFamily(colDesc);
+    admin.createTable(desc);
+    admin.close();
+    if (logger.isDebugEnabled()) logger.debug("HBaseClient.createCounterTable() - end");
+    return true;
+  }
+
+  public long incrCounter(String tabName, String rowId, String famName, String qualName, long incrVal) throws Exception
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBaseClient.incrCounter() - start");
+
+    HTable myHTable = new HTable(config, tabName);
+    long count = myHTable.incrementColumnValue(Bytes.toBytes(rowId), Bytes.toBytes(famName), Bytes.toBytes(qualName), incrVal);
+    myHTable.close();
+    return count;
+  }
+
+}
+    
+
+

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a44823fe/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java b/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
new file mode 100644
index 0000000..31d8cac
--- /dev/null
+++ b/core/sql/src/main/java/org/trafodion/sql/HBulkLoadClient.java
@@ -0,0 +1,533 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.sql;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Iterator;
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.security.access.Permission;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.compress.*;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.BloomType; 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.trafodion.sql.HTableClient;
+//import org.trafodion.sql.HBaseClient;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hbase.TableName;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+
+import org.apache.hive.jdbc.HiveDriver;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.lang.ClassNotFoundException;
+
+public class HBulkLoadClient
+{
+  
+  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+  private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
+  private final static String BULKLOAD_STAGING_DIR = "hbase.bulkload.staging.dir";
+  private final static long MAX_HFILE_SIZE = 10737418240L; //10 GB
+  
+  public static int BLOCKSIZE = 64*1024;
+  public static String COMPRESSION = Compression.Algorithm.NONE.getName();
+  String lastError;
+  static Logger logger = Logger.getLogger(HBulkLoadClient.class.getName());
+  Configuration config;
+  HFile.Writer writer;
+  String hFileLocation;
+  String hFileName;
+  long maxHFileSize = MAX_HFILE_SIZE;
+  FileSystem fileSys = null;
+  String compression = COMPRESSION;
+  int blockSize = BLOCKSIZE;
+  DataBlockEncoding dataBlockEncoding = DataBlockEncoding.NONE;
+  FSDataOutputStream fsOut = null;
+
+  public HBulkLoadClient()
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.HBulkLoadClient() called.");
+  }
+
+  public HBulkLoadClient(Configuration conf) throws IOException
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.HBulkLoadClient(...) called.");
+    config = conf;
+  }
+
+  public String getLastError() {
+    return lastError;
+  }
+
+  void setLastError(String err) {
+      lastError = err;
+  }
+  public boolean initHFileParams(String hFileLoc, String hFileNm, long userMaxSize /*in MBs*/, String tblName,
+                                 String sampleTblName, String sampleTblDDL) 
+  throws UnsupportedOperationException, IOException, SQLException, ClassNotFoundException
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.initHFileParams() called.");
+    
+    hFileLocation = hFileLoc;
+    hFileName = hFileNm;
+    
+    HTable myHTable = new HTable(config, tblName);
+    HTableDescriptor hTbaledesc = myHTable.getTableDescriptor();
+    HColumnDescriptor[] hColDescs = hTbaledesc.getColumnFamilies();
+    if (hColDescs.length > 2 )  //2 column family , 1 for user data, 1 for transaction metadata
+    {
+      myHTable.close();
+      throw new UnsupportedOperationException ("only two families are supported.");
+    }
+    
+    compression= hColDescs[0].getCompression().getName();
+    blockSize= hColDescs[0].getBlocksize();
+    dataBlockEncoding = hColDescs[0].getDataBlockEncoding();
+    
+    if (userMaxSize == 0)
+    {
+      if (hTbaledesc.getMaxFileSize()==-1)
+      {
+        maxHFileSize = MAX_HFILE_SIZE;
+      }
+      else
+      {
+        maxHFileSize = hTbaledesc.getMaxFileSize();
+      }
+    }
+    else 
+      maxHFileSize = userMaxSize * 1024 *1024;  //maxSize is in MBs
+
+    myHTable.close();
+
+    if (sampleTblDDL.length() > 0)
+    {
+      Class.forName("org.apache.hive.jdbc.HiveDriver");
+      Connection conn = DriverManager.getConnection("jdbc:hive2://", "hive", "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("drop table if exists " + sampleTblName);
+      //System.out.println("*** DDL for Hive sample table is: " + sampleTblDDL);
+      stmt.execute(sampleTblDDL);
+    }
+
+    return true;
+  }
+  public boolean doCreateHFile() throws IOException, URISyntaxException
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doCreateHFile() called.");
+    
+    if (hFileLocation == null )
+      throw new NullPointerException(hFileLocation + " is not set");
+    if (hFileName == null )
+      throw new NullPointerException(hFileName + " is not set");
+    
+    closeHFile();
+    
+    if (fileSys == null)
+     fileSys = FileSystem.get(config); 
+
+    Path hfilePath = new Path(new Path(hFileLocation ), hFileName + "_" +  System.currentTimeMillis());
+    hfilePath = hfilePath.makeQualified(hfilePath.toUri(), null);
+
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.createHFile Path: " + hfilePath);
+
+    try
+    {
+      HFileContext hfileContext = new HFileContextBuilder()
+                                 .withBlockSize(blockSize)
+                                 .withCompression(Compression.getCompressionAlgorithmByName(compression))
+                                 .withDataBlockEncoding(dataBlockEncoding)
+                                 .build();
+
+      writer =    HFile.getWriterFactory(config, new CacheConfig(config))
+                     .withPath(fileSys, hfilePath)
+                     .withFileContext(hfileContext)
+                     .withComparator(KeyValue.COMPARATOR)
+                     .create();
+      if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.createHFile Path: " + writer.getPath() + "Created");
+    }
+    catch (IOException e)
+    {
+       if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doCreateHFile Exception" + e.getMessage());
+       throw e;
+    }
+    return true;
+  }
+  
+  public boolean isNewFileNeeded() throws IOException
+  {
+    if (writer == null)
+      return true;
+    
+    if (fileSys == null)
+      fileSys = FileSystem.get(writer.getPath().toUri(),config);
+    
+    if (fileSys.getFileStatus(writer.getPath()).getLen() > maxHFileSize)
+     return true;
+
+    return false;
+  }
+
+  public boolean addToHFile(short rowIDLen, Object rowIDs,
+                Object rows) throws IOException, URISyntaxException
+  {
+     if (logger.isDebugEnabled()) logger.debug("Enter addToHFile() ");
+     Put put;
+    if (isNewFileNeeded())
+    {
+      doCreateHFile();
+    }
+     ByteBuffer bbRows, bbRowIDs;
+     short numCols, numRows;
+     short colNameLen;
+     int colValueLen;
+     byte[] colName, colValue, rowID;
+     short actRowIDLen;
+
+     bbRowIDs = (ByteBuffer)rowIDs;
+     bbRows = (ByteBuffer)rows;
+     numRows = bbRowIDs.getShort();
+     HTableClient htc = new HTableClient();
+     long now = System.currentTimeMillis();
+     for (short rowNum = 0; rowNum < numRows; rowNum++) 
+     {
+        byte rowIDSuffix  = bbRowIDs.get();
+        if (rowIDSuffix == '1')
+           actRowIDLen = (short)(rowIDLen+1);
+        else
+           actRowIDLen = rowIDLen;
+        rowID = new byte[actRowIDLen];
+        bbRowIDs.get(rowID, 0, actRowIDLen);
+        numCols = bbRows.getShort();
+        for (short colIndex = 0; colIndex < numCols; colIndex++)
+        {
+            colNameLen = bbRows.getShort();
+            colName = new byte[colNameLen];
+            bbRows.get(colName, 0, colNameLen);
+            colValueLen = bbRows.getInt();
+            colValue = new byte[colValueLen];
+            bbRows.get(colValue, 0, colValueLen);
+            KeyValue kv = new KeyValue(rowID,
+                                htc.getFamily(colName), 
+                                htc.getName(colName), 
+                                now,
+                                colValue);
+            writer.append(kv);
+        } 
+    }
+    if (logger.isDebugEnabled()) logger.debug("End addToHFile() ");
+       return true;
+  }
+
+  public boolean closeHFile() throws IOException
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.closeHFile() called." + ((writer == null) ? "NULL" : "NOT NULL"));
+
+    if (writer == null)
+      return false;
+    
+    writer.close();
+    return true;
+  }
+
+  private boolean createSnapshot( String tableName, String snapshotName)
+      throws MasterNotRunningException, IOException, SnapshotCreationException, InterruptedException, Exception
+  {
+    HBaseAdmin admin = null;
+    try 
+    {
+      admin = new HBaseAdmin(config);
+      List<SnapshotDescription>  lstSnaps = admin.listSnapshots();
+      if (! lstSnaps.isEmpty())
+      {
+        for (SnapshotDescription snpd : lstSnaps) 
+        {
+            if (snpd.getName().compareTo(snapshotName) == 0)
+            {
+              if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.createSnapshot() -- deleting: " + snapshotName + " : " + snpd.getName());
+              admin.deleteSnapshot(snapshotName);
+            }
+        }
+      }
+      admin.snapshot(snapshotName, tableName);
+   }
+    catch (Exception e)
+    {
+      //log exeception and throw the exception again to teh parent
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.createSnapshot() - Exception: " + e);
+      throw e;
+    }
+    finally
+    {
+      //close HBaseAdmin instance 
+      if (admin !=null)
+        admin.close();
+    }
+    return true;
+  }
+  
+  private boolean restoreSnapshot( String snapshotName, String tableName)
+      throws IOException, RestoreSnapshotException, Exception
+  {
+    HBaseAdmin admin = null;
+    try
+    {
+      admin = new HBaseAdmin(config);
+      if (! admin.isTableDisabled(tableName))
+          admin.disableTable(tableName);
+      
+      admin.restoreSnapshot(snapshotName);
+  
+      admin.enableTable(tableName);
+    }
+    catch (Exception e)
+    {
+      //log exeception and throw the exception again to the parent
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.restoreSnapshot() - Exception: " + e);
+      throw e;
+    }
+    finally
+    {
+      //close HBaseAdmin instance 
+      if (admin != null) 
+        admin.close();
+    }
+
+    return true;
+  }
+  private boolean deleteSnapshot( String snapshotName, String tableName)
+      throws IOException, Exception
+  {
+    
+    HBaseAdmin admin = null;
+    boolean snapshotExists = false;
+    try
+    {
+      admin = new HBaseAdmin(config);
+      List<SnapshotDescription>  lstSnaps = admin.listSnapshots();
+      if (! lstSnaps.isEmpty())
+      {
+        for (SnapshotDescription snpd : lstSnaps) 
+        {
+          //System.out.println("here 1: " + snapshotName + snpd.getName());
+          if (snpd.getName().compareTo(snapshotName) == 0)
+          {
+            //System.out.println("deleting: " + snapshotName + " : " + snpd.getName());
+            snapshotExists = true;
+            break;
+          }
+        }
+      }
+      if (!snapshotExists)
+        return true;
+      if (admin.isTableDisabled(tableName))
+          admin.enableTable(tableName);
+      admin.deleteSnapshot(snapshotName);
+    }
+    catch (Exception e)
+    {
+      //log exeception and throw the exception again to the parent
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.restoreSnapshot() - Exception: " + e);
+      throw e;
+    }
+    finally 
+    {
+      //close HBaseAdmin instance 
+      if (admin != null) 
+        admin.close();
+    }
+    return true;
+  }
+  
+  private void doSnapshotNBulkLoad(Path hFilePath, String tableName, HTable table, LoadIncrementalHFiles loader, boolean snapshot)
+      throws MasterNotRunningException, IOException, SnapshotCreationException, InterruptedException, RestoreSnapshotException, Exception
+  {
+    HBaseAdmin admin = new HBaseAdmin(config);
+    String snapshotName= null;
+    if (snapshot)
+    {
+      snapshotName = tableName + "_SNAPSHOT";
+      createSnapshot(tableName, snapshotName);
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - snapshot created: " + snapshotName);
+    }
+    try
+    {
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - bulk load started ");
+      loader.doBulkLoad(hFilePath, table);
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - bulk load is done ");
+    }
+    catch (IOException e)
+    {
+      if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - Exception: " + e.toString());
+      if (snapshot)
+      {
+        restoreSnapshot(snapshotName, tableName);
+        if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - snapshot restored: " + snapshotName);
+        deleteSnapshot(snapshotName, tableName);
+        if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - snapshot deleted: " + snapshotName);
+        throw e;
+      }
+    }
+    finally
+    {
+      if  (snapshot)
+      {
+        deleteSnapshot(snapshotName, tableName);
+        if (logger.isDebugEnabled()) logger.debug("HbulkLoadClient.doSnapshotNBulkLoad() - snapshot deleted: " + snapshotName);
+      }
+    }
+    
+  }
+  public boolean doBulkLoad(String prepLocation, String tableName, boolean quasiSecure, boolean snapshot) throws Exception
+  {
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doBulkLoad() - start");
+    if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doBulkLoad() - Prep Location: " + prepLocation + 
+                                             ", Table Name:" + tableName + 
+                                             ", quasisecure : " + quasiSecure +
+                                             ", snapshot: " + snapshot);
+
+      
+    HTable table = new HTable(config, tableName);
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(config);    
+    Path prepPath = new Path(prepLocation );
+    prepPath = prepPath.makeQualified(prepPath.toUri(), null);
+    FileSystem prepFs = FileSystem.get(prepPath.toUri(),config);
+    
+    Path[] hFams = FileUtil.stat2Paths(prepFs.listStatus(prepPath));
+
+    if (quasiSecure)
+    {
+      throw new Exception("HBulkLoadClient.doBulkLoad() - cannot perform load. Trafodion on secure HBase mode is not implemented yet");
+    }
+    else
+    {
+      if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doBulkLoad() - adjusting hfiles permissions");
+      for (Path hfam : hFams) 
+      {
+         Path[] hfiles = FileUtil.stat2Paths(prepFs.listStatus(hfam));
+         prepFs.setPermission(hfam,PERM_ALL_ACCESS );
+         for (Path hfile : hfiles)
+         {
+           if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doBulkLoad() - adjusting hfile permissions:" + hfile);
+           prepFs.setPermission(hfile,PERM_ALL_ACCESS);
+           
+         }
+         //create _tmp dir used as temp space for Hfile processing
+         FileSystem.mkdirs(prepFs, new Path(hfam,"_tmp"), PERM_ALL_ACCESS);
+      }
+      if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doBulkLoad() - bulk load started. Loading directly from preparation directory");
+      doSnapshotNBulkLoad(prepPath,tableName,  table,  loader,  snapshot);
+      if (logger.isDebugEnabled()) logger.debug("HBulkLoadClient.doBulkLoad() - bulk load is done ");
+    }
+    return true;
+  }
+
+  public boolean bulkLoadCleanup(String location) throws Exception
+  {
+      Path dir = new Path(location );
+      dir = dir.makeQualified(dir.toUri(), null);
+      FileSystem fs = FileSystem.get(dir.toUri(),config);
+      fs.delete(dir, true);
+      
+      return true;
+
+  }
+  
+  public boolean release( ) throws IOException {
+    if (writer != null)
+    {
+       writer.close();
+       writer = null;
+    }
+    if (fileSys !=null)
+    {
+      fileSys.close();
+      fileSys = null;
+    }
+    if (config != null) 
+    {
+      config = null;
+    }
+    if (hFileLocation != null)
+    {
+      hFileLocation = null;
+    }
+    if (hFileName != null)
+    {
+      hFileName = null;
+    }
+
+    if (compression != null)
+    {
+      compression = null;
+    }
+    return true;
+  }
+}