You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:03:05 UTC
svn commit: r1181374 - in /hbase/branches/0.89: bin/
src/test/java/org/apache/hadoop/hbase/loadtest/
Author: nspiegelberg
Date: Tue Oct 11 02:03:05 2011
New Revision: 1181374
URL: http://svn.apache.org/viewvc?rev=1181374&view=rev
Log:
Load test suite
Summary:
This test suite enables running load tests against hbase in ways similar to the
appserver use cases. Multiple column families for different use-cases can be
tested together.
Currently provided use-cases are 'ActionLog', 'Snapshot' and 'search'. The
interface is developed such that it can be easily extended to cover more
use-cases.
All parameters including rows, timestamps, number of columns and size of data
are configurable using a properties file. A filter type can also be specified
individually for each column family using 'FilterType'. Versioning is also
tested using configurable parameters.
The use-cases are in accordance with discussions held with Karthik and
Prashant. This test is for internal purposes only and will not be committed to
public trunk.
Test Plan:
Have run lots of tests using it and compared the results. Ran it for all
combination of column families: ActionLog, Snapshot and Search. Did manual
debugging and results analysis.
Moreover, this code is itself a test :)
DiffCamp Revision: 146646
Reviewed By: kannan
CC: davidrecordon, achao, kannan, pkhaitan, hbase@lists
Tasks:
#289860: Adding to load test framework
#297166: Functional tests to verify cluster - could re-use this to check
cluster provisioning for new clusters as well
Revert Plan:
OK
Added:
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/ColumnFamilyProperties.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DataGenerator.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DisplayFormatUtils.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/HBaseUtils.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedAction.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedReader.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedWriter.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/Tester.java
Modified:
hbase/branches/0.89/bin/hbase
Modified: hbase/branches/0.89/bin/hbase
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/bin/hbase?rev=1181374&r1=1181373&r2=1181374&view=diff
==============================================================================
--- hbase/branches/0.89/bin/hbase (original)
+++ hbase/branches/0.89/bin/hbase Tue Oct 11 02:03:05 2011
@@ -76,6 +76,7 @@ if [ $# = 0 ]; then
echo " avro run an HBase Avro server"
echo " migrate upgrade an hbase.rootdir"
echo " hbck run the hbase 'fsck' tool"
+ echo " verify [-help] verify that the cluster is working properly"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
@@ -255,6 +256,8 @@ elif [ "$COMMAND" = "migrate" ] ; then
CLASS='org.apache.hadoop.hbase.util.Migrate'
elif [ "$COMMAND" = "hbck" ] ; then
CLASS='org.apache.hadoop.hbase.client.HBaseFsck'
+elif [ "$COMMAND" = "verify" ] ; then
+ CLASS='org.apache.hadoop.hbase.loadtest.Tester'
elif [ "$COMMAND" = "zookeeper" ] ; then
CLASS='org.apache.hadoop.hbase.zookeeper.HQuorumPeer'
if [ "$1" != "stop" ] ; then
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/ColumnFamilyProperties.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/ColumnFamilyProperties.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/ColumnFamilyProperties.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/ColumnFamilyProperties.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,138 @@
+package org.apache.hadoop.hbase.loadtest;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ColumnFamilyProperties {
+
+ private static final Log LOG = LogFactory.getLog(Tester.class);
+
+ public static int familyIndex = 1;
+
+ public String familyName;
+ public int startTimestamp;
+ public int endTimestamp;
+ public int minColDataSize;
+ public int maxColDataSize;
+ public int minColsPerKey;
+ public int maxColsPerKey;
+ public int maxVersions;
+ public String filterType;
+ public String bloomType;
+ public String compressionType;
+
+ public void print() {
+ LOG.info("\n\nColumnName: " + familyName);
+ LOG.info("Timestamp Range: " + startTimestamp + "..." + endTimestamp);
+ LOG.info("Number of Columns/Key:" + minColsPerKey + "...." + maxColsPerKey);
+ LOG.info("Data Size/Column:" + minColDataSize + "..." + maxColDataSize);
+ LOG.info("Max Versions: " + maxVersions);
+ LOG.info("Filter type: " + filterType);
+ LOG.info("Bloom type: " + bloomType);
+ LOG.info("Compression type: " + compressionType + "\n\n");
+ }
+
+ public static final String defaultColumnProperties =
+ "\nReaderThreads=10" +
+ "\nWriterThreads=10" +
+ "\nStartKey=1" +
+ "\nEndKey=1000000" +
+ "\nNumberOfFamilies=3" +
+ "\nVerifyPercent=2" +
+ "\nVerbose=true" +
+ "\nClusterTestTime=120" +
+ "\nBulkLoad=true" +
+ "\nRegionsPerServer=7" +
+ "\n" +
+ "\nCF1_Name=Actions" +
+ "\nCF1_StartTimestamp=10" +
+ "\nCF1_EndTimestamp=1000" +
+ "\nCF1_MinDataSize=1" +
+ "\nCF1_MaxDataSize=10" +
+ "\nCF1_MinColsPerKey=1" +
+ "\nCF1_MaxColsPerKey=1" +
+ "\nCF1_MaxVersions=2147483647" +
+ "\nCF1_FilterType=Timestamps" +
+ "\nCF1_BloomType=Row" +
+ "\nCF1_CompressionType=None" +
+ "\n" +
+ "\nCF2_Name=Snapshot" +
+ "\nCF2_StartTimestamp=10" +
+ "\nCF2_EndTimestamp=20" +
+ "\nCF2_MinDataSize=500" +
+ "\nCF2_MaxDataSize=1000000" +
+ "\nCF2_MinColsPerKey=1" +
+ "\nCF2_MaxColsPerKey=1" +
+ "\nCF2_MaxVersions=1" +
+ "\nCF2_FilterType=None" +
+ "\nCF2_BloomType=None" +
+ "\nCF2_CompressionType=LZO" +
+ "\n" +
+ "\nCF3_Name=IndexSnapshot" +
+ "\nCF3_StartTimestamp=20" +
+ "\nCF3_EndTimestamp=100" +
+ "\nCF3_MinDataSize=1" +
+ "\nCF3_MaxDataSize=10" +
+ "\nCF3_MinColsPerKey=1" +
+ "\nCF3_MaxColsPerKey=1000" +
+ "\nCF3_MaxVersions=1" +
+ "\nCF3_FilterType=ColumnPrefix" +
+ "\nCF3_BloomType=RowCol" +
+ "\nCF3_CompressionType=GZ" +
+ "";
+}
+
+/**
+If creating an external file, you should use the following as a starting point and
+make whatever changes you want. It would be best not to omit any fields.
+
+ReaderThreads=10
+WriterThreads=10
+StartKey=1
+EndKey=1000000
+NumberOfFamilies=3
+VerifyPercent=2
+Verbose=true
+ClusterTestTime=120
+BulkLoad=true
+RegionsPerServer=7
+
+CF1_Name=Actions
+CF1_StartTimestamp=10
+CF1_EndTimestamp=1000
+CF1_MinDataSize=1
+CF1_MaxDataSize=10
+CF1_MinColsPerKey=1
+CF1_MaxColsPerKey=1
+CF1_MaxVersions=2147483647
+CF1_FilterType=Timestamps
+CF1_BloomType=Row
+CF1_CompressionType=None
+
+CF2_Name=Snapshot
+CF2_StartTimestamp=10
+CF2_EndTimestamp=20
+CF2_MinDataSize=500
+CF2_MaxDataSize=1000000
+CF2_MinColsPerKey=1
+CF2_MaxColsPerKey=1
+CF2_MaxVersions=1
+CF2_FilterType=None
+CF2_BloomType=None
+CF2_CompressionType=LZO
+
+CF3_Name=IndexSnapshot
+CF3_StartTimestamp=20
+CF3_EndTimestamp=100
+CF3_MinDataSize=1
+CF3_MaxDataSize=10
+CF3_MinColsPerKey=1
+CF3_MaxColsPerKey=1000
+CF3_MaxVersions=1
+CF3_FilterType=ColumnPrefix
+CF3_BloomType=RowCol
+CF3_CompressionType=GZ
+
+*
+*
+*/
\ No newline at end of file
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DataGenerator.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DataGenerator.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DataGenerator.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,220 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class DataGenerator {
+ private static final Log LOG = LogFactory.getLog(HBaseUtils.class);
+
+ static Random random_ = new Random();
+ /* one byte fill pattern */
+ public static final String fill1B_ = "-";
+ /* 64 byte fill pattern */
+ public static final String fill64B_ =
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ";
+ /* alternate 64 byte fill pattern */
+ public static final String fill64BAlt_ =
+ "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789+-";
+ /* 1K fill pattern */
+ public static final String fill1K_ =
+ fill64BAlt_ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_
+ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_
+ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_ + fill64BAlt_
+ + fill64BAlt_;
+
+ static public String paddedNumber(long key) {
+ // left-pad key with zeroes to 10 decimal places.
+ String paddedKey = String.format("%010d", key);
+ // flip the key to randomize
+ return (new StringBuffer(paddedKey)).reverse().toString();
+ }
+
+ public static byte[] getDataInSize(long key, int dataSize) {
+ StringBuilder sb = new StringBuilder();
+
+ // write the key first
+ int sizeLeft = dataSize;
+ String keyAsString = DataGenerator.paddedNumber(key);
+ sb.append(keyAsString);
+ sizeLeft -= keyAsString.length();
+
+ for (int i = 0; i < sizeLeft / 1024; ++i) {
+ sb.append(fill1K_);
+ }
+ sizeLeft = sizeLeft % 1024;
+ for (int i = 0; i < sizeLeft / 64; ++i) {
+ sb.append(fill64B_);
+ }
+ sizeLeft = sizeLeft % 64;
+ for (int i = 0; i < dataSize % 64; ++i) {
+ sb.append(fill1B_);
+ }
+
+ return sb.toString().getBytes();
+ }
+
+ public static int getValueLength(int minDataSize, int maxDataSize, long rowKey) {
+ return Math.abs(minDataSize
+ + hash((int) rowKey, (maxDataSize - minDataSize + 1)));
+ }
+
+ public static int getNumberOfColumns(long minColumnsPerKey,
+ long maxColumnsPerKey, long rowKey) {
+ return Math.abs((int) minColumnsPerKey
+ + hash((int) rowKey, (int) (maxColumnsPerKey - minColumnsPerKey + 1)));
+ }
+
+ public static int hash(int key, int mod) {
+ return key % mod;
+ }
+
+ public static byte[] getDataInSize(long row, int column, int timestamp,
+ int minDataSize, int maxDataSize) {
+ int dataSize = getValueLength(minDataSize, maxDataSize, row);
+ return getDataInSize(row * column * timestamp, dataSize);
+ }
+
+ public static TreeSet<KeyValue> getSortedResultSet(long rowID,
+ ColumnFamilyProperties familyProperty) {
+ TreeSet<KeyValue> kvSet =
+ new TreeSet<KeyValue>(new KeyValue.KVComparator());
+ // byte[] row = DataGenerator.paddedKey(rowKey).getBytes();
+ byte[] row = RegionSplitter.getHBaseKeyFromRowID(rowID);
+ int numColumns =
+ getNumberOfColumns(familyProperty.minColsPerKey,
+ familyProperty.maxColsPerKey, rowID);
+ byte[] family = Bytes.toBytes(familyProperty.familyName);
+ for (int colIndex = 0; colIndex <= numColumns; ++colIndex) {
+ byte[] column = (DataGenerator.paddedNumber(colIndex)).getBytes();
+ for (int timestamp = familyProperty.startTimestamp; timestamp <= familyProperty.endTimestamp; timestamp++) {
+ byte[] value =
+ getDataInSize(rowID, colIndex, timestamp,
+ familyProperty.minColDataSize, familyProperty.maxColDataSize);
+ KeyValue kv = new KeyValue(row, family, column, timestamp, value);
+ kvSet.add(kv);
+ }
+ }
+ return kvSet;
+ }
+
+ /**
+ * Returns a set containing keys for the passed row based on the information
+ * in familyProperties. This is a slightly redundant form of the above
+ * function but is required for efficiency.
+ *
+ * @param rowID
+ * @param familyProperties
+ * @return
+ */
+ public static Put getPut(long rowID, ColumnFamilyProperties[] familyProperties) {
+ // Put put = new Put(DataGenerator.paddedKey(rowKeyID).getBytes());
+ byte[] row = RegionSplitter.getHBaseKeyFromRowID(rowID);
+ Put put = new Put(row);
+ for (ColumnFamilyProperties familyProperty : familyProperties) {
+ int numColumns =
+ getNumberOfColumns(familyProperty.minColsPerKey,
+ familyProperty.maxColsPerKey, rowID);
+ byte[] family = Bytes.toBytes(familyProperty.familyName);
+ for (int colIndex = 0; colIndex <= numColumns; ++colIndex) {
+ byte[] column = (DataGenerator.paddedNumber(colIndex)).getBytes();
+ for (int timestamp = familyProperty.startTimestamp; timestamp <= familyProperty.endTimestamp; timestamp++) {
+ byte[] value =
+ getDataInSize(rowID, colIndex, timestamp,
+ familyProperty.minColDataSize, familyProperty.maxColDataSize);
+ put.add(family, column, timestamp, value);
+ }
+ }
+ }
+ return put;
+ }
+
+ public static TreeSet<KeyValue> filterAndVersioningForSingleRowFamily(
+ TreeSet<KeyValue> kvSet, Filter filter, int maxVersions) {
+ int currentVersions = 0;
+ byte[] prevColumn = null;
+ TreeSet<KeyValue> filteredSet =
+ new TreeSet<KeyValue>(new KeyValue.KVComparator());
+ for (KeyValue kv : kvSet) {
+ if (filter == null
+ || filter.filterKeyValue(kv).equals(Filter.ReturnCode.INCLUDE)) {
+ byte[] column = kv.getQualifier();
+ if (Bytes.equals(prevColumn, column)) {
+ currentVersions++;
+ } else {
+ prevColumn = column;
+ currentVersions = 1;
+ }
+ if (currentVersions <= maxVersions) {
+ filteredSet.add(kv);
+ }
+ }
+ }
+ return filteredSet;
+ }
+
+ public static TimestampsFilter getTimestampFilter(long rowKey,
+ ColumnFamilyProperties familyProperty) {
+ double timestampSelectionFrequency = 0.01;
+ List<Long> timestamps = new ArrayList<Long>();
+ for (long timestamp = familyProperty.startTimestamp; timestamp <= familyProperty.endTimestamp; timestamp++) {
+ if (timestampSelectionFrequency >= Math.random()) {
+ timestamps.add(timestamp);
+ }
+ }
+ TimestampsFilter timestampsFilter = new TimestampsFilter(timestamps);
+ return timestampsFilter;
+ }
+
+ public static Filter getColumnPrefixFilter(long rowKey,
+ ColumnFamilyProperties familyProperty) {
+ int randomNumber = (int) (Math.random() * 20);
+ byte[] prefix = Bytes.toBytes(randomNumber);
+ return new ColumnPrefixFilter(prefix);
+ }
+
+ public static Filter getFilter(long rowKey,
+ ColumnFamilyProperties familyProperty) {
+ if (familyProperty.filterType == null
+ || familyProperty.filterType.equalsIgnoreCase("None")) {
+ return null;
+ } else if (familyProperty.filterType.equalsIgnoreCase("Timestamps")) {
+ return getTimestampFilter(rowKey, familyProperty);
+ } else if (familyProperty.filterType.equalsIgnoreCase("ColumnPrefix")) {
+ return getColumnPrefixFilter(rowKey, familyProperty);
+ } else {
+ LOG.info("FilterType " + familyProperty.filterType + " not recognized!"
+ + "Currently supported filter types are 'Timestamps' and 'None'");
+ return null;
+ }
+ }
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DisplayFormatUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DisplayFormatUtils.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DisplayFormatUtils.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/DisplayFormatUtils.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+public class DisplayFormatUtils {
+ public static String formatTime(long elapsedTime) {
+ String format = String.format("%%0%dd", 2);
+ elapsedTime = elapsedTime / 1000;
+ String seconds = String.format(format, elapsedTime % 60);
+ String minutes = String.format(format, (elapsedTime % 3600) / 60);
+ String hours = String.format(format, elapsedTime / 3600);
+ String time = hours + ":" + minutes + ":" + seconds;
+ return time;
+ }
+
+ public static String formatNumber(long number) {
+ if (number >= 1000000000) {
+ return ((number / 1000000000) + "B");
+ } else if (number >= 1000000) {
+ return ((number / 1000000) + "M");
+ } else if (number >= 1000) {
+ return ((number / 1000) + "K");
+ } else {
+ return (number + "");
+ }
+ }
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/HBaseUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/HBaseUtils.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/HBaseUtils.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/HBaseUtils.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,182 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBaseUtils {
+ private static final Log LOG = LogFactory.getLog(HBaseUtils.class);
+
+ public static void sleep(int millisecs) {
+ try {
+ Thread.sleep(millisecs);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ public static HTable getHTable(Configuration conf, byte[] tableName) {
+ HTable table = null;
+ try {
+ table = new HTable(conf, tableName);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return table;
+ }
+
+ public static boolean createTableIfNotExists(Configuration conf,
+ byte[] tableName, ColumnFamilyProperties[] familyProperties,
+ int regionsPerServer) {
+ // LOG.info("Creating table if not exists: "+Bytes.toString(tableName));
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (ColumnFamilyProperties familyProperty : familyProperties) {
+ // LOG.info("CF Name: "+familyProperty.familyName);
+ String bloomType = familyProperty.bloomType;
+ if (bloomType == null) {
+ bloomType = "NONE";
+ }
+ String compressionType = familyProperty.compressionType;
+ if (compressionType == null) {
+ compressionType = "NONE";
+ }
+ desc.addFamily(new HColumnDescriptor(Bytes
+ .toBytes(familyProperty.familyName), familyProperty.maxVersions,
+ compressionType,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ HColumnDescriptor.DEFAULT_TTL,
+ bloomType));
+ }
+ try {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ int numberOfServers = admin.getClusterStatus().getServers();
+ int totalNumberOfRegions = numberOfServers * regionsPerServer;
+ if (totalNumberOfRegions == 0) {
+ admin.createTable(desc);
+ } else {
+ byte[][] splits = RegionSplitter.splitKeys(HashingSchemes.MD5,
+ totalNumberOfRegions);
+ admin.createTable(desc, splits);
+ }
+ return true;
+ } catch (MasterNotRunningException e) {
+ LOG.error("Master not running.");
+ e.printStackTrace();
+ return false;
+ } catch (TableExistsException e) {
+ LOG.info("Table already exists.");
+ return true;
+ } catch (IOException e) {
+ LOG.error("IO Exception.");
+ e.printStackTrace();
+ return false;
+ } catch (Exception e) {
+ LOG.error("Exception.");
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public static boolean deleteTable(Configuration conf, byte[] tableName) {
+ // LOG.info("Deleting table: "+Bytes.toString(tableName) + "....");
+ try {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ return true;
+ } catch (MasterNotRunningException e) {
+ LOG.error("Master not running.");
+ e.printStackTrace();
+ return false;
+ } catch (TableNotFoundException e) {
+ LOG.info("Table does not exist.");
+ return false;
+ } catch (TableNotDisabledException e) {
+ LOG.info("Table not disabled.");
+ return false;
+ } catch (IOException e) {
+ LOG.error("IO Exception.");
+ e.printStackTrace();
+ return false;
+ } catch (Exception e) {
+ LOG.error("Exception.");
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public static boolean flushTable(Configuration conf, byte[] tableName) {
+ // LOG.info("Flushing table: "+Bytes.toString(tableName) + "....");
+ try {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.flush(tableName);
+ return true;
+ } catch (MasterNotRunningException e) {
+ LOG.error("Master not running.");
+ e.printStackTrace();
+ return false;
+ } catch (TableNotFoundException e) {
+ LOG.info("Table not found.");
+ return false;
+ } catch (IOException e) {
+ LOG.error("IO Exception.");
+ e.printStackTrace();
+ return false;
+ } catch (Exception e) {
+ LOG.error("Exception.");
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public static HServerAddress getMetaRS(HBaseConfiguration conf)
+ throws IOException {
+ HTable table = new HTable(conf, HConstants.META_TABLE_NAME);
+ HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(""));
+ return hloc.getServerAddress();
+ }
+
+ public static Configuration getHBaseConfFromZkNode(String zkNodeName) {
+ Configuration c = HBaseConfiguration.create();
+ if (zkNodeName != null) {
+ c.set("hbase.zookeeper.quorum", zkNodeName);
+ }
+ return c;
+ }
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedAction.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedAction.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedAction.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,167 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+
+public abstract class MultiThreadedAction
+{
+ private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
+ public int numThreads = 1;
+ public static byte[] tableName;
+ public float verifyPercent = 0;
+ public long startKey = 0;
+ public long endKey = 1;
+ public AtomicInteger numThreadsWorking = new AtomicInteger(0);
+ public AtomicLong numRows_ = new AtomicLong(0);
+ public AtomicLong numRowsVerified_ = new AtomicLong(0);
+ public AtomicLong numKeys_ = new AtomicLong(0);
+ public AtomicLong numErrors_ = new AtomicLong(0);
+ public AtomicLong numOpFailures_ = new AtomicLong(0);
+ public AtomicLong cumulativeOpTime_ = new AtomicLong(0);
+ private boolean verbose = false;
+ protected Random random = new Random();
+ public Configuration conf;
+ private boolean shouldRun = true;
+ private ColumnFamilyProperties[] familyProperties;
+ private boolean stopOnError = false;
+ public ProgressReporter currentProgressReporter = null;
+
+ public void startReporter(String id) {
+ currentProgressReporter = new ProgressReporter(id);
+ currentProgressReporter.start();
+ }
+
+ public boolean getStopOnError() {
+ return stopOnError;
+ }
+
+ public void setStopOnError(boolean stopOnError) {
+ this.stopOnError = stopOnError;
+ }
+
+ public class ProgressReporter extends Thread {
+
+ private String id_ = "";
+
+ public ProgressReporter(String id) {
+ id_ = id;
+ }
+
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ long reportingInterval = 5000;
+
+ long priorNumRows = 0;
+ long priorCumulativeOpTime = 0;
+
+ while(verbose && numThreadsWorking.get() != 0) {
+ String threadsLeft = "[" + id_ + ":" + numThreadsWorking.get() + "] ";
+ if(MultiThreadedAction.this.numRows_.get() == 0) {
+ LOG.info(threadsLeft + "Number of rows = 0");
+ }
+ else {
+ long numRowsNumber = numRows_.get();
+ long time = System.currentTimeMillis() - startTime;
+ long cumulativeOpTime = cumulativeOpTime_.get();
+
+ long numRowsDelta = numRowsNumber - priorNumRows;
+ long cumulativeOpTimeDelta = cumulativeOpTime - priorCumulativeOpTime;
+
+ LOG.info(threadsLeft + "Rows = " + numRowsNumber +
+ ", keys = " + DisplayFormatUtils.formatNumber(numKeys_.get()) +
+ ", time = " + DisplayFormatUtils.formatTime(time) +
+ ((numRowsNumber > 0 && time > 0)?
+ (" Overall: [" +
+ "keys/s = " + numRowsNumber*1000/time +
+ ", latency = " + cumulativeOpTime/numRowsNumber + " ms]")
+ : "") +
+ ((numRowsDelta > 0) ?
+ (" Current: [" +
+ "rows/s = " + numRowsDelta*1000/reportingInterval +
+ ", latency = " + cumulativeOpTimeDelta/numRowsDelta +
+ " ms]") : "") +
+ ((numRowsVerified_.get()>0)?(", verified = " +
+ numRowsVerified_.get()):"") +
+ ((numOpFailures_.get()>0)?(", FAILURES = " +
+ numOpFailures_.get()):"") +
+ ((numErrors_.get()>0)?(", ERRORS = " +
+ numErrors_.get()):"")
+ );
+ priorNumRows = numRowsNumber;
+ priorCumulativeOpTime = cumulativeOpTime;
+ }
+ try {
+ Thread.sleep(reportingInterval);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ public boolean getVerbose() {
+ return this.verbose;
+ }
+
+ public void setColumnFamilyProperties(
+ ColumnFamilyProperties[] familyProperties) {
+ this.familyProperties = familyProperties;
+ }
+
+ public ColumnFamilyProperties[] getColumnFamilyProperties() {
+ return this.familyProperties;
+ }
+
+ public void setVerficationPercent(float verifyPercent) {
+ this.verifyPercent = verifyPercent;
+ }
+
+ public boolean shouldContinueRunning() {
+ return shouldRun;
+ }
+
+ /**
+ * This is an unsafe operation so avoid use.
+ */
+ public void killAllThreads() {
+ if (currentProgressReporter != null && currentProgressReporter.isAlive()) {
+ currentProgressReporter.stop();
+ }
+ }
+
+ public void pleaseStopRunning() {
+ shouldRun = false;
+ }
+
+ public abstract void start(long startKey, long endKey, int numThreads);
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedReader.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedReader.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedReader.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,259 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class MultiThreadedReader extends MultiThreadedAction {
+ private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
+ Set<HBaseReader> readers_ = new HashSet<HBaseReader>();
+ private boolean writesHappeningInParallel_ = false;
+
+ public void setWriteHappeningInParallel() {
+ writesHappeningInParallel_ = true;
+ }
+
+ public boolean areWritesHappeningInParallel() {
+ return writesHappeningInParallel_;
+ }
+
+ public MultiThreadedReader(Configuration config, byte[] tableName) {
+ this.tableName = tableName;
+ this.conf = config;
+ }
+
+ public void start(long startKey, long endKey, int numThreads) {
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.numThreads = numThreads;
+
+ if (this.getVerbose()) {
+ LOG.info("Reading keys [" + this.startKey + ", " + this.endKey + ")");
+ }
+
+ long threadStartKey = this.startKey;
+ long threadEndKey = this.startKey;
+ for (int i = 0; i < this.numThreads; ++i) {
+ threadStartKey = (this.startKey == -1) ? -1 : threadEndKey;
+ threadEndKey = this.startKey + (i + 1) * (this.endKey - this.startKey)
+ / this.numThreads;
+ HBaseReader reader = new HBaseReader(this, i, threadStartKey,
+ threadEndKey);
+ readers_.add(reader);
+ }
+ numThreadsWorking.addAndGet(readers_.size());
+ for (HBaseReader reader : readers_) {
+ reader.start();
+ }
+ startReporter("R");
+ }
+
+ /**
+ * This is an unsafe operation so avoid use.
+ */
+ public void killAllThreads() {
+ for (HBaseReader reader : readers_) {
+ if (reader != null && reader.isAlive()) {
+ reader.stop();
+ }
+ }
+ super.killAllThreads();
+ }
+
+ public static class HBaseReader extends Thread {
+ int id_;
+ MultiThreadedReader reader_;
+ List<HTable> tables_ = new ArrayList<HTable>();
+ long startKey_;
+ long endKey_;
+
+ public HBaseReader(MultiThreadedReader reader, int id, long startKey,
+ long endKey) {
+ id_ = id;
+ reader_ = reader;
+ HTable table = HBaseUtils.getHTable(reader_.conf, tableName);
+ tables_.add(table);
+ startKey_ = startKey;
+ endKey_ = endKey;
+ }
+
+ public void run() {
+ if (reader_.getVerbose()) {
+ // LOG.info("Started thread #" + id_ + " for reads...");
+ }
+ boolean repeatQuery = false;
+ long start = 0;
+ long curKey = 0;
+
+ while (reader_.shouldContinueRunning()) {
+ if (!repeatQuery) {
+ if (reader_.areWritesHappeningInParallel()) {
+ // load test is running at the same time.
+ while (MultiThreadedWriter.insertedKeySet_.size() <= 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ int idx = reader_.random
+ .nextInt(MultiThreadedWriter.insertedKeySet_.size());
+ curKey = MultiThreadedWriter.insertedKeySet_.get(idx);
+ } else {
+ curKey = startKey_ + Math.abs(reader_.random.nextLong())
+ % (endKey_ - startKey_);
+ }
+ } else {
+ repeatQuery = false;
+ }
+ try {
+ if (reader_.getVerbose() && repeatQuery) {
+ LOG.info("[" + id_ + "] "
+ + (repeatQuery ? "RE-Querying" : "Querying") + " key = "
+ + curKey);
+ }
+ queryKey(curKey,
+ (reader_.random.nextInt(100) < reader_.verifyPercent));
+ } catch (IOException e) {
+ reader_.numOpFailures_.addAndGet(1);
+ LOG.debug("[" + id_ + "] FAILED read, key = " + (curKey + "") + ","
+ + "time = " + (System.currentTimeMillis() - start) + " ms");
+ repeatQuery = true;
+ }
+ }
+ reader_.numThreadsWorking.decrementAndGet();
+ }
+
+ public void queryKey(long rowKey, boolean verify) throws IOException {
+ for (HTable table : tables_) {
+ for (ColumnFamilyProperties familyProperty : reader_
+ .getColumnFamilyProperties()) {
+ //Get get = new Get(DataGenerator.paddedKey(rowKey).getBytes());
+ Get get = new Get(RegionSplitter.getHBaseKeyFromRowID(rowKey));
+ get.setMaxVersions(familyProperty.maxVersions);
+ get.addFamily(Bytes.toBytes(familyProperty.familyName));
+ Filter filter = DataGenerator.getFilter(rowKey, familyProperty);
+ get.setFilter(filter);
+ long start = System.currentTimeMillis();
+ Result result = table.get(get);
+ reader_.cumulativeOpTime_.addAndGet(System.currentTimeMillis()
+ - start);
+ reader_.numRows_.addAndGet(1);
+ reader_.numKeys_.addAndGet(result.size());
+ if (verify) {
+ KeyValue[] kvResult = result.raw();
+ TreeSet<KeyValue> kvExpectedFull = DataGenerator
+ .getSortedResultSet(rowKey, familyProperty);
+ TreeSet<KeyValue> kvExpectedFiltered = DataGenerator
+ .filterAndVersioningForSingleRowFamily(kvExpectedFull, filter,
+ familyProperty.maxVersions);
+ boolean verificationResult = verifyResultSetIgnoringDuplicates(
+ kvResult, kvExpectedFiltered);
+ if (verificationResult == false) {
+ reader_.numErrors_.addAndGet(1);
+ LOG.error("Error checking data for key = " + rowKey);
+ if (reader_.numErrors_.get() > 1) {
+ LOG.error("Aborting run -- found more than one error\n");
+ if (reader_.getStopOnError()) {
+ System.exit(-1);
+ }
+ }
+ }
+ reader_.numRowsVerified_.addAndGet(1);
+ }
+ }
+ }
+ }
+
+ boolean verifyResultSet(KeyValue[] kvResult, TreeSet<KeyValue> kvExpected) {
+ if (kvResult.length != kvExpected.size()) {
+ LOG.info("Expected size was: " + kvExpected.size() + " "
+ + "but result set was of size: " + kvResult.length);
+ return false;
+ }
+ int index = 0;
+ for (KeyValue kv : kvExpected) {
+ if (KeyValue.COMPARATOR.compare(kv, kvResult[index++]) != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ boolean verifyResultSetIgnoringDuplicates(KeyValue[] kvResult,
+ TreeSet<KeyValue> kvExpected) {
+ TreeSet<KeyValue> noDuplicateResultSet = new TreeSet<KeyValue>(
+ new KeyValue.KVComparator());
+ for (KeyValue kv : kvResult) {
+ noDuplicateResultSet.add(kv);
+ }
+ boolean isCorrect = noDuplicateResultSet.equals(kvExpected);
+
+ if (isCorrect == false) {
+ debugMismatchResults(noDuplicateResultSet, kvExpected);
+ }
+ return isCorrect;
+ }
+ }
+
+ static void debugMismatchResults(TreeSet<KeyValue> noDuplicateResultSet,
+ TreeSet<KeyValue> kvExpected) {
+ if (noDuplicateResultSet.size() != kvExpected.size()) {
+ LOG.info("Expected size was: " + kvExpected.size()
+ + " but result set was of size: " + noDuplicateResultSet.size());
+ }
+ Iterator<KeyValue> expectedIterator = kvExpected.iterator();
+ Iterator<KeyValue> returnedIterator = noDuplicateResultSet.iterator();
+ while (expectedIterator.hasNext() || returnedIterator.hasNext()) {
+ KeyValue expected = null;
+ KeyValue returned = null;
+ if (expectedIterator.hasNext()) {
+ expected = expectedIterator.next();
+ }
+ if (returnedIterator.hasNext()) {
+ returned = returnedIterator.next();
+ }
+ if (returned == null || expected == null) {
+ LOG.info("MISMATCH!! Expected was : " + expected + " but got "
+ + returned);
+ } else if (KeyValue.COMPARATOR.compare(expected, returned) != 0) {
+ LOG.info("MISMATCH!! Expected was : " + expected + " but got "
+ + returned);
+ } else {
+ LOG.info("Expected was : " + expected + " and got " + returned);
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedWriter.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedWriter.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/MultiThreadedWriter.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,203 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class MultiThreadedWriter extends MultiThreadedAction
+{
+ private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
+ Set<HBaseWriter> writers_ = new HashSet<HBaseWriter>();
+
+ /* This is the current key to be inserted by any thread. Each thread does an
+ atomic get and increment operation and inserts the current value. */
+ public static AtomicLong currentKey_ = null;
+ /* The sorted set of keys inserted by the writers */
+ public static List<Long> insertedKeySet_ = Collections.synchronizedList(
+ new ArrayList<Long>());
+ /* The sorted set of keys NOT inserted by the writers */
+ public static List<Long> failedKeySet_ = Collections.synchronizedList(
+ new ArrayList<Long>());
+ public boolean bulkLoad = true;
+
+
+ public MultiThreadedWriter(Configuration config, byte[] tableName) {
+ this.tableName = tableName;
+ this.conf = config;
+ }
+
+ public void start(long startKey, long endKey, int numThreads) {
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.numThreads = numThreads;
+ currentKey_ = new AtomicLong(this.startKey);
+
+ if(getVerbose()) {
+ LOG.debug("Inserting keys [" + this.startKey + ", " + this.endKey + ")");
+ }
+
+ for(int i = 0; i < this.numThreads; ++i) {
+ HBaseWriter writer = new HBaseWriter(this, i);
+ writers_.add(writer);
+ }
+ numThreadsWorking.addAndGet(writers_.size());
+ for(HBaseWriter writer : writers_) {
+ writer.start();
+ }
+ startReporter("W");
+ }
+
+
+ public void setBulkLoad(boolean bulkLoad) {
+ this.bulkLoad = bulkLoad;
+ }
+
+ public boolean getBulkLoad() {
+ return bulkLoad;
+ }
+
+ public void waitForFinish() {
+ while(numThreadsWorking.get() != 0) {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ System.out.println("Failed Key Count: " + failedKeySet_.size());
+ for (Long key : failedKeySet_) {
+ System.out.println("Failure for key: " + key);
+ }
+
+ }
+
+ /**
+ * This is an unsafe operation so avoid use.
+ */
+ public void killAllThreads() {
+ for (HBaseWriter writer: writers_) {
+ if (writer != null && writer.isAlive()) {
+ writer.stop();
+ }
+ }
+ super.killAllThreads();
+ }
+
+ public static class HBaseWriter extends Thread {
+ int id_;
+ MultiThreadedWriter writer_;
+ Random random_ = new Random();
+ List<HTable> tables_ = new ArrayList<HTable>();
+
+ public HBaseWriter(MultiThreadedWriter writer, int id) {
+ id_ = id;
+ this.writer_ = writer;
+ HTable table = HBaseUtils.getHTable(this.writer_.conf, tableName);
+ tables_.add(table);
+ }
+
+ public void run() {
+ long rowKey = currentKey_.getAndIncrement();
+ do {
+ if (writer_.getVerbose()) {
+ //LOG.info("Writing key: "+rowKey);
+ }
+ if (writer_.bulkLoad == true) {
+ bulkInsertKey(rowKey, writer_.getColumnFamilyProperties());
+ } else {
+ insertKeys(rowKey, writer_.getColumnFamilyProperties());
+ }
+ rowKey = currentKey_.getAndIncrement();
+ } while(writer_.shouldContinueRunning() && rowKey < writer_.endKey);
+ writer_.numThreadsWorking.decrementAndGet();
+ }
+
+ public void bulkInsertKey(long rowKey,
+ ColumnFamilyProperties[] familyProperties) {
+ Put put = DataGenerator.getPut(rowKey, familyProperties);
+ try {
+ long start = System.currentTimeMillis();
+ putIntoTables(put);
+ insertedKeySet_.add(rowKey);
+ writer_.numRows_.addAndGet(1);
+ writer_.numKeys_.addAndGet(put.size());
+ writer_.cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start);
+ }
+ catch (IOException e) {
+ writer_.numOpFailures_.addAndGet(1);
+ failedKeySet_.add(rowKey);
+ e.printStackTrace();
+ }
+ }
+
+ public void insertKeys(long rowKey,
+ ColumnFamilyProperties[] familyProperties) {
+ byte[] row = RegionSplitter.getHBaseKeyFromRowID(rowKey);
+ //LOG.info("Inserting row: "+Bytes.toString(row));
+ int insertedSize = 0;
+ try {
+ long start = System.currentTimeMillis();
+ for (ColumnFamilyProperties familyProperty : familyProperties) {
+ TreeSet<KeyValue> kvSet = DataGenerator.
+ getSortedResultSet(rowKey, familyProperty);
+ for (KeyValue kv: kvSet) {
+ Put put = new Put(row);
+ put.add(kv);
+ insertedSize ++;
+ putIntoTables(put);
+ }
+ }
+ insertedKeySet_.add(rowKey);
+ writer_.numRows_.addAndGet(1);
+ writer_.numKeys_.addAndGet(insertedSize);
+ writer_.cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start);
+ }
+ catch (IOException e) {
+ writer_.numOpFailures_.addAndGet(1);
+ failedKeySet_.add(rowKey);
+ e.printStackTrace();
+ }
+ }
+
+ public void putIntoTables(Put put) throws IOException {
+ for(HTable table : tables_) {
+ table.put(put);
+ }
+ }
+ }
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/RegionSplitter.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,133 @@
+package org.apache.hadoop.hbase.loadtest;
+
+import java.math.BigInteger;
+import java.security.MessageDigest;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.lang.StringUtils;
+
+final class HashingSchemes
+{
+ public static final String SHA_1 = "SHA-1";
+ public static final String SHA1 = "SHA1";
+ public static final String MD5 = "MD5";
+}
+
+
+public class RegionSplitter {
+
+ private final static String MAXMD5 = "7FFFFFFF";
+ private final static int rowComparisonLength = MAXMD5.length();
+
+ /**
+ * Creates splits for the given hashingType.
+ * @param hashingType
+ * @param numberOfSplits
+ * @return Byte array of size (numberOfSplits-1) corresponding to the
+ * boundaries between splits.
+ * @throws NoSuchAlgorithmException if the algorithm is not supported by
+ * this splitter
+ */
+ public static byte[][] splitKeys(String hashingType, int numberOfSplits) {
+ if (hashingType.equals(HashingSchemes.MD5)) {
+ return splitKeysMD5(numberOfSplits);
+ } else {
+ throw new UnsupportedOperationException("This algorithm is not" +
+ " currently supported by this class");
+ }
+ }
+
+ /**
+ * Creates splits for MD5 hashing.
+ * @param numberOfSplits
+ * @return Byte array of size (numberOfSplits-1) corresponding to the
+ * boundaries between splits.
+ */
+ private static byte[][] splitKeysMD5(int numberOfSplits) {
+ BigInteger max = new BigInteger(MAXMD5, 16);
+ BigInteger[] bigIntegerSplits = split(max, numberOfSplits);
+ byte[][] byteSplits = convertToBytes(bigIntegerSplits);
+ return byteSplits;
+ }
+
+ /**
+ * Splits the given BigInteger into numberOfSplits parts
+ * @param maxValue
+ * @param numberOfSplits
+ * @return array of BigInteger which is of size (numberOfSplits-1)
+ */
+ private static BigInteger[] split(BigInteger maxValue, int numberOfSplits) {
+ BigInteger[] splits = new BigInteger[numberOfSplits-1];
+ BigInteger sizeOfEachSplit = maxValue.divide(BigInteger.
+ valueOf(numberOfSplits));
+ for (int i = 1; i < numberOfSplits; i++) {
+ splits[i-1] = sizeOfEachSplit.multiply(BigInteger.valueOf(i));
+ }
+ return splits;
+ }
+
+ /**
+ * Returns an array of bytes corresponding to an array of BigIntegers
+ * @param bigIntegers
+ * @return bytes corresponding to the bigIntegers
+ */
+ private static byte[][] convertToBytes(BigInteger[] bigIntegers) {
+ byte[][] returnBytes = new byte[bigIntegers.length][];
+ for (int i = 0; i < bigIntegers.length; i++) {
+ returnBytes[i] = convertToByte(bigIntegers[i]);
+ }
+ return returnBytes;
+ }
+
+ /**
+ * Returns the bytes corresponding to the BigInteger
+ * @param bigInteger
+ * @return byte corresponding to input BigInteger
+ */
+ private static byte[] convertToByte(BigInteger bigInteger) {
+ String bigIntegerString = bigInteger.toString(16);
+ bigIntegerString = StringUtils.leftPad(bigIntegerString,
+ rowComparisonLength, '0');
+ return Bytes.toBytes(bigIntegerString);
+ }
+
+ /////////////////////////////////////
+ /**Code for hashing*/
+ /////////////////////////////////////
+
+ public static byte[] getHBaseKeyFromRowID(long rowID) {
+ return getHBaseKeyFromEmail(rowID+"");
+ }
+
+ public static byte[] getHBaseKeyFromEmail(String email) {
+ String ret = hashToString(hash(email));
+ ret += ":" + email;
+ return Bytes.toBytes(ret);
+ }
+
+ public static String hashToString(BigInteger data) {
+ String ret = data.toString(16);
+ return "00000000000000000000000000000000".substring(ret.length()) + ret;
+ }
+
+ public static BigInteger hash(String data)
+ {
+ byte[] result = hash(HashingSchemes.MD5, data.getBytes());
+ BigInteger hash = new BigInteger(result);
+ return hash.abs();
+ }
+
+ public static byte[] hash(String type, byte[] data)
+ {
+ byte[] result = null;
+ try {
+ MessageDigest messageDigest = MessageDigest.getInstance(type);
+ result = messageDigest.digest(data);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+}
Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/Tester.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/Tester.java?rev=1181374&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/Tester.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/loadtest/Tester.java Tue Oct 11 02:03:05 2011
@@ -0,0 +1,441 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.loadtest;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.StringReader;
+import java.util.Properties;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class Tester {
+
+ public static String defaultTestClusterName = "VerificationTest_DummyTable";
+ public String inputFilename = "LoadTest.properties";
+
+ private boolean verbose = false;
+
+ int clusterTestTime = 60;
+ int regionsPerServer = 5;
+
+ public static final String dashedLine =
+ "-----------------------------------------------------------------------";
+
+ // global HBase configuration for the JVM - referenced by all classes.
+ private Configuration config;
+ // startup options
+ public static Options options = new Options();
+
+ // table name for the test
+ public byte[] tableName;
+ // column families used by the test
+ private static final Log LOG = LogFactory.getLog(Tester.class);
+
+ MultiThreadedReader reader;
+ MultiThreadedWriter writer;
+
+ ColumnFamilyProperties[] familyProperties;
+
+ public Tester(String inputFilename, String zkNodeName, String tableNameString) {
+ if (inputFilename != null) {
+ this.inputFilename = inputFilename;
+ }
+ this.config = HBaseUtils.getHBaseConfFromZkNode(zkNodeName);
+ LOG.info("Adding hbase.zookeeper.quorum = "
+ + config.get("hbase.zookeeper.quorum"));
+ if (tableNameString == null) {
+ tableNameString = defaultTestClusterName;
+ } else {
+ tableNameString = "VerificationTest_" + tableNameString;
+ }
+ this.tableName = Bytes.toBytes(tableNameString);
+ this.writer = new MultiThreadedWriter(this.config, tableName);
+ this.reader = new MultiThreadedReader(this.config, tableName);
+ }
+
+ long startKey;
+ long endKey;
+ int readerThreads;
+ int writerThreads;
+
+ public void readPropertiesFile() {
+ Properties properties = new Properties();
+ try {
+ File inputFile = new File(inputFilename);
+ if (inputFile.exists()) {
+ LOG.info("Found properties file at " + inputFile.getAbsolutePath()
+ + " so loading it..... ");
+ properties.load(new FileInputStream(inputFile));
+ } else {
+ LOG.info("Did not find properties file " + inputFilename
+ + " so using default properties..... ");
+ LOG.info("Properties is : \n"
+ + ColumnFamilyProperties.defaultColumnProperties);
+ properties.load(new StringReader(
+ ColumnFamilyProperties.defaultColumnProperties));
+ }
+
+ verbose =
+ Boolean.parseBoolean(properties.getProperty("Verbose", "False"));
+ readerThreads =
+ Integer.parseInt(properties.getProperty("ReaderThreads", "1"));
+ writerThreads =
+ Integer.parseInt(properties.getProperty("WriterThreads", "1"));
+ startKey = Long.parseLong(properties.getProperty("StartKey", "1"));
+ endKey = Long.parseLong(properties.getProperty("EndKey", "10"));
+ int numberOfFamilies =
+ Integer.parseInt(properties.getProperty("NumberOfFamilies", "1"));
+ int verifyPercent =
+ Integer.parseInt(properties.getProperty("VerifyPercent", "10"));
+ clusterTestTime =
+ Integer.parseInt(properties.getProperty("ClusterTestTime", "60"));
+ writer.setBulkLoad(Boolean.parseBoolean(properties.getProperty(
+ "BulkLoad", "False")));
+ regionsPerServer =
+ Integer.parseInt(properties.getProperty("RegionsPerServer"));
+
+ if (verbose == true) {
+ LOG.info("Reader Threads: " + readerThreads);
+ LOG.info("Writer Threads: " + writerThreads);
+ LOG.info("Number of Column Families: " + numberOfFamilies);
+ LOG.info("Key range: " + startKey + "..." + endKey);
+ LOG.info("VerifyPercent: " + verifyPercent);
+ LOG.info("ClusterTestTime: " + clusterTestTime);
+ LOG.info("BulkLoad: " + writer.getBulkLoad());
+ LOG.info("RegionsPerServer: " + regionsPerServer);
+ }
+
+ this.familyProperties = new ColumnFamilyProperties[numberOfFamilies];
+ for (int i = 0; i < numberOfFamilies; i++) {
+ familyProperties[i] = new ColumnFamilyProperties();
+ String columnPrefix = "CF" + (i + 1) + "_";
+ familyProperties[i].familyName =
+ properties.getProperty(columnPrefix + "Name", "Dummy " + (i + 1));
+ familyProperties[i].startTimestamp =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "StartTimestamp", "1"));
+ familyProperties[i].endTimestamp =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "EndTimestamp", "10"));
+ familyProperties[i].minColDataSize =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "MinDataSize", "1"));
+ familyProperties[i].maxColDataSize =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "MaxDataSize", "10"));
+ familyProperties[i].minColsPerKey =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "MinColsPerKey", "1"));
+ familyProperties[i].maxColsPerKey =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "MaxColsPerKey", "10"));
+ familyProperties[i].maxVersions =
+ Integer.parseInt(properties.getProperty(columnPrefix
+ + "MaxVersions", "10"));
+ familyProperties[i].filterType =
+ properties.getProperty(columnPrefix + "FilterType", "None");
+ familyProperties[i].bloomType =
+ properties.getProperty(columnPrefix + "BloomType", "None");
+ familyProperties[i].compressionType =
+ properties.getProperty(columnPrefix + "CompressionType", "None");
+ if (verbose == true) {
+ familyProperties[i].print();
+ }
+ }
+ writer.setColumnFamilyProperties(familyProperties);
+ reader.setColumnFamilyProperties(familyProperties);
+ reader.setVerficationPercent(verifyPercent);
+ reader.setVerbose(verbose);
+ writer.setVerbose(verbose);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.info("Error reading properties file... Aborting!!!");
+ System.exit(0);
+ }
+ }
+
+ public void loadData() {
+ createTableIfNotExists();
+ writer.setStopOnError(true);
+ reader.setWriteHappeningInParallel();
+ writer.start(startKey, endKey, writerThreads);
+ LOG.info("Started writing data...");
+ }
+
+ public void readData() {
+ createTableIfNotExists();
+ reader.setStopOnError(true);
+ reader.start(startKey, endKey, readerThreads);
+ LOG.info("Started reading data...");
+ }
+
+ public boolean createTableIfNotExists() {
+ if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info("Creating table if not exists................................");
+ }
+ return HBaseUtils.createTableIfNotExists(config, tableName,
+ familyProperties, regionsPerServer);
+ }
+
+ public boolean deleteTable() {
+ if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info("Deleting table if it exists......");
+ }
+ return HBaseUtils.deleteTable(config, tableName);
+ }
+
+ public void startAction(MultiThreadedAction action, String actionType,
+ int numberOfThreads) {
+ if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info("Starting " + actionType + " thread..........................");
+ }
+ action.start(startKey, endKey, numberOfThreads);
+ }
+
+ public boolean flushTable() {
+ if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info("Flushing table....");
+ }
+ return HBaseUtils.flushTable(config, tableName);
+ }
+
+ public boolean testCluster() {
+ int flushAfterStartInSeconds = clusterTestTime / 4;
+ int stopAfterFlushInSeconds = clusterTestTime / 4;
+ int checkAfterStopInSeconds = clusterTestTime / 4;
+ int deleteAfterCheckInSeconds = clusterTestTime / 16;
+
+ boolean tableCreated = createTableIfNotExists();
+
+ reader.setWriteHappeningInParallel();
+ startAction(writer, "Writer", writerThreads);
+ startAction(reader, "Reader", readerThreads);
+
+ sleep(flushAfterStartInSeconds);
+ boolean tableFlushed = flushTable();
+
+ sleep(stopAfterFlushInSeconds);
+ stopAction(writer, "Writer");
+ stopAction(reader, "Reader");
+
+ sleep(checkAfterStopInSeconds);
+ boolean writerStatus = checkActionStatus(writer, "Writer");
+ boolean readerStatus = checkActionStatus(reader, "Reader");
+
+ sleep(deleteAfterCheckInSeconds);
+ boolean tableDeleted = deleteTable();
+
+ boolean overall =
+ tableCreated && tableFlushed && writerStatus && readerStatus
+ && tableDeleted;
+
+ String passed = "Passed! :)";
+ String failed = "Failed :(";
+ LOG.info(dashedLine);
+ LOG.info("Summary of cluster test");
+ LOG.info(dashedLine);
+ LOG.info("Table creating: " + (tableCreated ? passed : failed));
+ LOG.info("Table flushing: " + (tableFlushed ? passed : failed));
+ LOG.info("Table Deleting: " + (tableDeleted ? passed : failed));
+ LOG.info("Writer status: " + (writerStatus ? passed : failed));
+ LOG.info("Reader status: " + (readerStatus ? passed : failed));
+ LOG.info(dashedLine);
+ LOG.info("Cluster test: " + (overall ? passed : failed));
+ LOG.info(dashedLine);
+
+ return overall;
+ }
+
+ public void sleep(int seconds) {
+ if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info("Sleeping for " + seconds + " seconds.... zzz");
+ }
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void stopAction(MultiThreadedAction action, String actionType) {
+ action.pleaseStopRunning();
+ if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info(actionType + " was running with "
+ + action.numThreadsWorking.get() + " threads.");
+ LOG.info("Stopping" + actionType + "thread............................");
+ }
+ }
+
+ public boolean checkActionStatus(MultiThreadedAction action, String actionType) {
+ boolean workingProperly = true;
+ if (action.numErrors_.get() > 0) {
+ LOG.info(dashedLine);
+ LOG.info("PROBLEM: " + actionType + " has " + action.numErrors_.get()
+ + " errors");
+ workingProperly = false;
+ } else if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info(actionType + " has no errors");
+ }
+
+ if (action.numOpFailures_.get() > 0) {
+ LOG.info(dashedLine);
+ LOG.info("PROBLEM: " + actionType + " has " + action.numOpFailures_.get()
+ + " op failures");
+ workingProperly = false;
+ } else if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info(actionType + " has no op failures");
+ }
+
+ if (action.numRows_.get() <= 0) {
+ LOG.info(dashedLine);
+ LOG.info("PROBLEM: " + actionType + " has not processed any keys.");
+ workingProperly = false;
+ } else if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info(actionType + " has processed keys.");
+ }
+
+ if (action.numThreadsWorking.get() != 0) {
+ LOG.info(dashedLine);
+ LOG.info("PROBLEM: " + actionType + " has not stopped yet. "
+ + action.numThreadsWorking.get() + " threads were still running"
+ + " so will kill them.");
+ workingProperly = false;
+ action.killAllThreads();
+ } else if (verbose) {
+ LOG.info(dashedLine);
+ LOG.info(actionType + " has stopped running");
+ }
+ return workingProperly;
+ }
+
+ public static void main(String[] args) {
+ try {
+ CommandLine cmd = initAndParseArgs(args);
+ if (cmd.hasOption(OPT_HELP)) {
+ printUsage();
+ return;
+ }
+ String inputFilename = cmd.getOptionValue(OPT_INPUT_FILENAME);
+ String zkNodeName = cmd.getOptionValue(OPT_ZKNODE);
+ String tableName = cmd.getOptionValue(OPT_TABLE_NAME);
+ Tester hBaseTest = new Tester(inputFilename, zkNodeName, tableName);
+ hBaseTest.readPropertiesFile();
+
+ if (cmd.hasOption(OPT_DELETE_TABLE)) {
+ hBaseTest.deleteTable();
+ }
+
+ if (cmd.hasOption(OPT_LOAD)) {
+ hBaseTest.loadData();
+ } else if (cmd.hasOption(OPT_READ)) {
+ hBaseTest.readData();
+ } else if (cmd.hasOption(OPT_LOADREAD)) {
+ hBaseTest.loadData();
+ hBaseTest.readData();
+ } else if (cmd.hasOption(OPT_TEST_CLUSTER)
+ || !cmd.hasOption(OPT_DELETE_TABLE)) {
+ hBaseTest.deleteTable();
+ boolean clusterStatus = hBaseTest.testCluster();
+ if (clusterStatus) {
+ System.exit(1);
+ } else {
+ System.exit(-1);
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage();
+ }
+ }
+
+ private static String USAGE;
+ private static final String HEADER = "HBaseTest";
+ private static final String FOOTER =
+ "\nCalling this wihout any arguments"
+ + " will start a short verification test with default table name and"
+ + " current machine as zookeeper. "
+ + "\n To specify a different zk use the -zk option.\n";
+ private static final String OPT_ZKNODE = "zk";
+ private static final String OPT_LOAD = "load";
+ private static final String OPT_READ = "read";
+ private static final String OPT_LOADREAD = "loadread";
+ private static final String OPT_DELETE_TABLE = "deletetable";
+ private static final String OPT_TEST_CLUSTER = "testcluster";
+ private static final String OPT_TABLE_NAME = "tn";
+ private static final String OPT_INPUT_FILENAME = "inputfile";
+ private static final String OPT_HELP = "help";
+
+ static CommandLine initAndParseArgs(String[] args) throws ParseException {
+ // set the usage object
+ USAGE =
+ "bin/hbase org.apache.hadoop.hbase.loadtest.Tester " + " -"
+ + OPT_ZKNODE + " <Zookeeper node>" + " -" + OPT_TABLE_NAME
+ + " <Table name>" + " -" + OPT_LOAD + " -" + OPT_READ + " -"
+ + OPT_TEST_CLUSTER + " -" + OPT_HELP + " -" + OPT_INPUT_FILENAME;
+ // add options
+ options.addOption(OPT_HELP, false, "Help");
+ options.addOption(OPT_ZKNODE, true, "Zookeeper node in the HBase cluster"
+ + " (optional)");
+ options.addOption(OPT_TABLE_NAME, true,
+ "The name of the table to be read or write (optional)");
+ options.addOption(OPT_INPUT_FILENAME, true,
+ "Path to input configuration file (optional)");
+ options.addOption(OPT_LOAD, false, "Command to load Data");
+ options.addOption(OPT_READ, false, "Command to read Data assuming all"
+ + " required data had been previously loaded to the table");
+ options.addOption(OPT_LOADREAD, false, "Command to load and read Data");
+ options.addOption(OPT_DELETE_TABLE, false, "Command to delete table before"
+ + " testing it");
+ options.addOption(OPT_TEST_CLUSTER, false, "Command to run a short"
+ + " verification test on cluster."
+ + " This also deletes the table if it exists before running the test");
+ // parse the passed in options
+ CommandLineParser parser = new BasicParser();
+ CommandLine cmd = parser.parse(options, args);
+ return cmd;
+ }
+
+ private static void printUsage() {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(80);
+ helpFormatter.printHelp(USAGE, HEADER, options, FOOTER);
+ }
+}
\ No newline at end of file