You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2008/11/19 01:43:29 UTC

svn commit: r718822 - in /hadoop/hbase/branches/0.19_on_hadoop_0.18: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/mapred/ src/java/org/apache/hadoop/hbase/regionserver/ src/...

Author: apurtell
Date: Tue Nov 18 16:43:29 2008
New Revision: 718822

URL: http://svn.apache.org/viewvc?rev=718822&view=rev
Log:
merged to trunk (revision 718820)

Modified:
    hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HStoreKey.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/package-info.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt Tue Nov 18 16:43:29 2008
@@ -69,7 +69,7 @@
    HBASE-951   Either shut down master or let it finish cleanup
    HBASE-964, HBASE-678 provide for safe-mode without locking up HBase "waiting
                for root region"
-   HBASE-990   NoSuchElementException in flushSomeRegions
+   HBASE-990   NoSuchElementException in flushSomeRegions; took two attempts.
    HBASE-602   HBase Crash when network card has a IPv6 address
    HBASE-996   Migration script to up the versions in catalog tables
    HBASE-991   Update the mapred package document examples so they work with
@@ -124,6 +124,10 @@
    HBASE-999   Up versions on historian and keep history of deleted regions for a
                while rather than delete immediately
    HBASE-938   Major compaction period is not checked periodically
+   HBASE-947   [Optimization] Major compaction should remove deletes as well as
+               the deleted cell
+   HBASE-675   Report correct server hosting a table split for assignment to
+               for MR Jobs
         
   NEW FEATURES
    HBASE-875   Use MurmurHash instead of JenkinsHash [in bloomfilters]

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HStoreKey.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HStoreKey.java Tue Nov 18 16:43:29 2008
@@ -529,9 +529,6 @@
       if(rowCompare == 0)
         rowCompare = Bytes.compareTo(keysA[1], KeysB[1]);
       return rowCompare;
-    } 
-    if (regionInfo != null && regionInfo.getTableDesc().getRowKeyComparator() != null) {
-      return regionInfo.getTableDesc().getRowKeyComparator().compare(rowA, rowB);
     }
     return Bytes.compareTo(rowA, rowB);
   }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/HTableDescriptor.java Tue Nov 18 16:43:29 2008
@@ -19,12 +19,8 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +32,6 @@
 import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -74,8 +69,6 @@
     new ImmutableBytesWritable(Bytes.toBytes(IS_ROOT));
   public static final String IS_META = "IS_META";
 
-  public static final String ROW_KEY_COMPARATOR = "ROW_KEY_COMPARATOR";
-
   public static final ImmutableBytesWritable IS_META_KEY =
     new ImmutableBytesWritable(Bytes.toBytes(IS_META));
 
@@ -431,47 +424,7 @@
     setValue(MEMCACHE_FLUSHSIZE_KEY,
       Bytes.toBytes(Integer.toString(memcacheFlushSize)));
   }
-  
-  
-  public void setRowKeyComparator(WritableComparator<byte[]> newComparator) {
-    if (newComparator == null) {
-      return;
-    }
     
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(bos);
-    HBaseConfiguration conf = new HBaseConfiguration();
-    try {
-      ObjectWritable.writeObject(dos, newComparator, WritableComparator.class, conf);
-      dos.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    setValue(ROW_KEY_COMPARATOR.getBytes(), bos.toByteArray());
-    this.comparator = newComparator;
-  }
-  
-  private WritableComparator<byte[]> comparator = null;
-  public WritableComparator<byte[]> getRowKeyComparator() {
-    if (comparator != null) {
-      return comparator;
-    }
-    byte[] bytes = getValue(ROW_KEY_COMPARATOR.getBytes());
-    if (bytes == null) {
-      return null;
-    }
-    ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-    DataInputStream in = new DataInputStream(bis);
-    HBaseConfiguration conf = new HBaseConfiguration();
-    try {
-      comparator = (WritableComparator<byte[]>) ObjectWritable.readObject(in, conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return comparator;
-  }
-  
-  
   public Collection<IndexSpecification> getIndexes() {
     return indexes.values();
   }
@@ -708,4 +661,4 @@
           new HColumnDescriptor(HConstants.COLUMN_FAMILY_HISTORIAN,
             HConstants.ALL_VERSIONS, HColumnDescriptor.CompressionType.NONE,
             false, false, Integer.MAX_VALUE, HConstants.WEEK_IN_SECONDS, false)});
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java Tue Nov 18 16:43:29 2008
@@ -26,6 +26,8 @@
  */
 public class IndexNotFoundException extends IOException {
 
+  private static final long serialVersionUID = 6533971528557000965L;
+
   public IndexNotFoundException() {
     super();
   }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java Tue Nov 18 16:43:29 2008
@@ -24,11 +24,9 @@
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.WritableComparator;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /** Holds the specification for a single secondary index. */
 public class IndexSpecification implements Writable {
@@ -39,8 +37,6 @@
   // Constructs the
   private IndexKeyGenerator keyGenerator;
 
-  private WritableComparator<byte[]> keyComparator;
-
   // Additional columns mapped into the indexed row. These will be available for
   // filters when scanning the index.
   private byte[][] additionalColumns;
@@ -51,11 +47,9 @@
   private String indexId;
 
   /** Construct an "simple" index spec for a single column. */
-  public IndexSpecification(String indexId, byte[] indexedColumn,
-      boolean acending) {
+  public IndexSpecification(String indexId, byte[] indexedColumn) {
     this(indexId, new byte[][] { indexedColumn }, null,
-        new SimpleIndexKeyGenerator(indexedColumn), acending == true ? null
-            : new ReverseByteArrayComparator());
+        new SimpleIndexKeyGenerator(indexedColumn));
   }
 
   /**
@@ -68,13 +62,11 @@
    * @param keyComparator
    */
   public IndexSpecification(String indexId, byte[][] indexedColumns,
-      byte[][] additionalColumns, IndexKeyGenerator keyGenerator,
-      WritableComparator<byte[]> keyComparator) {
+      byte[][] additionalColumns, IndexKeyGenerator keyGenerator) {
     this.indexId = indexId;
     this.indexedColumns = indexedColumns;
     this.additionalColumns = additionalColumns;
     this.keyGenerator = keyGenerator;
-    this.keyComparator = keyComparator;
     this.makeAllColumns();
   }
 
@@ -111,15 +103,6 @@
   }
 
   /**
-   * Get the keyComparator.
-   * 
-   * @return Return the keyComparator.
-   */
-  public WritableComparator<byte[]> getKeyComparator() {
-    return keyComparator;
-  }
-
-  /**
    * Get the additionalColumns.
    * 
    * @return Return the additionalColumns.
@@ -171,8 +154,6 @@
     makeAllColumns();
     HBaseConfiguration conf = new HBaseConfiguration();
     keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, conf);
-    keyComparator = (WritableComparator<byte[]>) ObjectWritable.readObject(in,
-        conf);
   }
 
   /** {@inheritDoc} */
@@ -193,8 +174,6 @@
     HBaseConfiguration conf = new HBaseConfiguration();
     ObjectWritable
         .writeObject(out, keyGenerator, IndexKeyGenerator.class, conf);
-    ObjectWritable.writeObject(out, keyComparator, WritableComparable.class,
-        conf);
   }
 
 }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java Tue Nov 18 16:43:29 2008
@@ -92,8 +92,6 @@
       indexTableDesc.addFamily(new HColumnDescriptor(colFamily));
     }
 
-    indexTableDesc.setRowKeyComparator(indexSpec.getKeyComparator());
-
     return indexTableDesc;
   }
 }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html Tue Nov 18 16:43:29 2008
@@ -26,9 +26,8 @@
 The IndexSpecification class provides the metadata for the index. This includes:
 <li> the columns that contribute to the index key,
 <li> additional columns to put in the index table (and are thus made available to filters on the index table),
-<li> an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row, 
 <br> and 
-<li> (optionally) a custom key comparator for the indexed table. This can allow an index on a deserialized column value.
+<li> an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row.
 
 IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed. 
 Afterwards, updates and deletes to the original table will trigger the updates in the index, and 

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableInputFormatBase.java Tue Nov 18 16:43:29 2008
@@ -263,7 +263,7 @@
    * {@link InputSplit} array.
    *
    * @param job the map task {@link JobConf}
-   * @param numSplits a hint to calculate the number of splits
+   * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
    *
    * @return the input splits
    *
@@ -280,24 +280,23 @@
     if (this.inputColumns == null || this.inputColumns.length == 0) {
       throw new IOException("Expecting at least one column");
     }
-    int realNumSplits = numSplits > startKeys.length ? startKeys.length
-        : numSplits;
+    int realNumSplits = numSplits > startKeys.length? startKeys.length:
+      numSplits;
     InputSplit[] splits = new InputSplit[realNumSplits];
     int middle = startKeys.length / realNumSplits;
     int startPos = 0;
     for (int i = 0; i < realNumSplits; i++) {
       int lastPos = startPos + middle;
       lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
+      String regionLocation = table.getRegionLocation(startKeys[startPos]).
+        getServerAddress().getHostname(); 
       splits[i] = new TableSplit(this.table.getTableName(),
-          startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]
-              : HConstants.EMPTY_START_ROW);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("split: " + i + "->" + splits[i]);
-      }
+        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
+          HConstants.EMPTY_START_ROW, regionLocation);
+      LOG.info("split: " + i + "->" + splits[i]);
       startPos = lastPos;
     }
     return splits;
-
   }
 
   /**

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/TableSplit.java Tue Nov 18 16:43:29 2008
@@ -30,15 +30,16 @@
 /**
  * A table split corresponds to a key range [low, high)
  */
-public class TableSplit implements InputSplit {
+public class TableSplit implements InputSplit, Comparable {
   private byte [] m_tableName;
   private byte [] m_startRow;
   private byte [] m_endRow;
+  private String m_regionLocation;
 
   /** default constructor */
   public TableSplit() {
     this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
-      HConstants.EMPTY_BYTE_ARRAY);
+      HConstants.EMPTY_BYTE_ARRAY, "");
   }
 
   /**
@@ -46,53 +47,67 @@
    * @param tableName
    * @param startRow
    * @param endRow
+   * @param location
    */
-  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow) {
-    m_tableName = tableName;
-    m_startRow = startRow;
-    m_endRow = endRow;
+  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+      final String location) {
+    this.m_tableName = tableName;
+    this.m_startRow = startRow;
+    this.m_endRow = endRow;
+    this.m_regionLocation = location;
   }
 
   /** @return table name */
   public byte [] getTableName() {
-    return m_tableName;
+    return this.m_tableName;
   }
 
   /** @return starting row key */
   public byte [] getStartRow() {
-    return m_startRow;
+    return this.m_startRow;
   }
 
   /** @return end row key */
   public byte [] getEndRow() {
-    return m_endRow;
+    return this.m_endRow;
   }
 
-  public long getLength() {
-    // Not clear how to obtain this... seems to be used only for sorting splits
-    return 0;
+  /** @return the region's hostname */
+  public String getRegionLocation() {
+    return this.m_regionLocation;
   }
 
   public String[] getLocations() {
-    // Return a random node from the cluster for now
-    return new String[] { };
+    return new String[] {this.m_regionLocation};
+  }
+
+  public long getLength() {
+    // Not clear how to obtain this... seems to be used only for sorting splits
+    return 0;
   }
 
   public void readFields(DataInput in) throws IOException {
     this.m_tableName = Bytes.readByteArray(in);
     this.m_startRow = Bytes.readByteArray(in);
     this.m_endRow = Bytes.readByteArray(in);
+    this.m_regionLocation = Bytes.toString(Bytes.readByteArray(in));
   }
 
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, this.m_tableName);
     Bytes.writeByteArray(out, this.m_startRow);
     Bytes.writeByteArray(out, this.m_endRow);
+    Bytes.writeByteArray(out, Bytes.toBytes(this.m_regionLocation));
   }
 
   @Override
   public String toString() {
-    return Bytes.toString(m_tableName) +"," + Bytes.toString(m_startRow) +
-      "," + Bytes.toString(m_endRow);
+    return m_regionLocation + ":" +
+      Bytes.toString(m_startRow) + "," + Bytes.toString(m_endRow);
+  }
+
+  public int compareTo(Object arg) {
+    TableSplit other = (TableSplit)arg;
+    return Bytes.compareTo(getStartRow(), other.getStartRow());
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/package-info.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/package-info.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/mapred/package-info.java Tue Nov 18 16:43:29 2008
@@ -85,7 +85,10 @@
 specify source/sink table and column names in your configuration.</p>
 
 <p>Reading from hbase, the TableInputFormat asks hbase for the list of
-regions and makes a map-per-region.  
+regions and makes a map-per-region or <code>mapred.map.tasks maps</code>,
+whichever is smaller (If your job only has two maps, up mapred.map.tasks
+to a number > number of regions). Maps will run on the adjacent TaskTracker
+if you are running a TaskTracer and RegionServer per node.
 Writing, it may make sense to avoid the reduce step and write yourself back into
 hbase from inside your map. You'd do this when your job does not need the sort
 and collation that mapreduce does on the map emitted data; on insert,

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Nov 18 16:43:29 2008
@@ -435,7 +435,6 @@
       curfile = new HStoreFile(conf, fs, basedir, this.info,
         family.getName(), fid, reference);
       long storeSeqId = -1;
-      boolean majorCompaction = false;
       try {
         storeSeqId = curfile.loadInfo(fs);
         if (storeSeqId > this.maxSeqId) {
@@ -1044,11 +1043,45 @@
   }
   
   /*
+   * @param r List to reverse
+   * @return A reversed array of content of <code>readers</code>
+   */
+  private MapFile.Reader [] reverse(final List<MapFile.Reader> r) {
+    List<MapFile.Reader> copy = new ArrayList<MapFile.Reader>(r);
+    Collections.reverse(copy);
+    return copy.toArray(new MapFile.Reader[0]);
+  }
+
+  /*
+   * @param rdrs List of readers
+   * @param keys Current keys
+   * @param done Which readers are done
+   * @return The lowest current key in passed <code>rdrs</code>
+   */
+  private int getLowestKey(final MapFile.Reader [] rdrs,
+      final HStoreKey [] keys, final boolean [] done) {
+    int lowestKey = -1;
+    for (int i = 0; i < rdrs.length; i++) {
+      if (done[i]) {
+        continue;
+      }
+      if (lowestKey < 0) {
+        lowestKey = i;
+      } else {
+        if (keys[i].compareTo(keys[lowestKey]) < 0) {
+          lowestKey = i;
+        }
+      }
+    }
+    return lowestKey;
+  }
+
+  /*
    * Compact a list of MapFile.Readers into MapFile.Writer.
    * 
-   * We work by iterating through the readers in parallel. We always increment
-   * the lowest-ranked one. Updates to a single row/column will appear ranked
-   * by timestamp.
+   * We work by iterating through the readers in parallel looking at newest
+   * store file first. We always increment the lowest-ranked one. Updates to a
+   * single row/column will appear ranked by timestamp.
    * @param compactedOut Where to write compaction.
    * @param pReaders List of readers sorted oldest to newest.
    * @param majorCompaction True to force a major compaction regardless of
@@ -1058,14 +1091,12 @@
   private void compact(final MapFile.Writer compactedOut,
       final List<MapFile.Reader> pReaders, final boolean majorCompaction)
   throws IOException {
-    // Reverse order so newest is first.
-    List<MapFile.Reader> copy = new ArrayList<MapFile.Reader>(pReaders);
-    Collections.reverse(copy);
-    MapFile.Reader[] rdrs = copy.toArray(new MapFile.Reader[0]);
+    // Reverse order so newest store file is first.
+    MapFile.Reader[] rdrs = reverse(pReaders);
     try {
-      HStoreKey[] keys = new HStoreKey[rdrs.length];
-      ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
-      boolean[] done = new boolean[rdrs.length];
+      HStoreKey [] keys = new HStoreKey[rdrs.length];
+      ImmutableBytesWritable [] vals = new ImmutableBytesWritable[rdrs.length];
+      boolean [] done = new boolean[rdrs.length];
       for(int i = 0; i < rdrs.length; i++) {
         keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.info);
         vals[i] = new ImmutableBytesWritable();
@@ -1085,56 +1116,67 @@
 
       long now = System.currentTimeMillis();
       int timesSeen = 0;
-      byte [] lastRow = null;
-      byte [] lastColumn = null;
+      HStoreKey lastSeen = new HStoreKey();
+      HStoreKey lastDelete = null;
       while (numDone < done.length) {
-        int smallestKey = -1;
-        for (int i = 0; i < rdrs.length; i++) {
-          if (done[i]) {
-            continue;
-          }
-          if (smallestKey < 0) {
-            smallestKey = i;
-          } else {
-            if (keys[i].compareTo(keys[smallestKey]) < 0) {
-              smallestKey = i;
-            }
-          }
-        }
-        HStoreKey sk = keys[smallestKey];
-        if (HStoreKey.equalsTwoRowKeys(info,lastRow, sk.getRow())
-            && Bytes.equals(lastColumn, sk.getColumn())) {
+        // Get lowest key in all store files.
+        int lowestKey = getLowestKey(rdrs, keys, done);
+        HStoreKey sk = keys[lowestKey];
+        // If its same row and column as last key, increment times seen.
+        if (HStoreKey.equalsTwoRowKeys(info, lastSeen.getRow(), sk.getRow())
+            && Bytes.equals(lastSeen.getColumn(), sk.getColumn())) {
           timesSeen++;
+          // Reset last delete if not exact timestamp -- lastDelete only stops
+          // exactly the same key making it out to the compacted store file.
+          if (lastDelete != null &&
+              lastDelete.getTimestamp() != sk.getTimestamp()) {
+            lastDelete = null;
+          }
         } else {
           timesSeen = 1;
+          lastDelete = null;
         }
 
         // Don't write empty rows or columns.  Only remove cells on major
         // compaction.  Remove if expired of > VERSIONS
         if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
-          boolean expired = false;
-          if (!majorCompaction ||
-              (timesSeen <= family.getMaxVersions() &&
-                !(expired = isExpired(sk, ttl, now)))) {
-              compactedOut.append(sk, vals[smallestKey]);
-          }
-          if (expired) {
-            // HBASE-855 remove one from timesSeen because it did not make it
-            // past expired check -- don't count against max versions.
-            timesSeen--;
+          ImmutableBytesWritable value = vals[lowestKey];
+          if (!majorCompaction) {
+            // Write out all values if not a major compaction.
+            compactedOut.append(sk, value);
+          } else {
+            boolean expired = false;
+            boolean deleted = false;
+            if (timesSeen <= family.getMaxVersions() &&
+                !(expired = isExpired(sk, ttl, now))) {
+              // If this value key is same as a deleted key, skip
+              if (lastDelete != null && sk.equals(lastDelete)) {
+                deleted = true;
+              } else if (HLogEdit.isDeleted(value.get())) {
+                // If a deleted value, skip
+                deleted = true;
+                lastDelete = new HStoreKey(sk);
+              } else {
+                compactedOut.append(sk, vals[lowestKey]);
+              }
+            }
+            if (expired || deleted) {
+              // HBASE-855 remove one from timesSeen because it did not make it
+              // past expired check -- don't count against max versions.
+              timesSeen--;
+            }
           }
         }
 
         // Update last-seen items
-        lastRow = sk.getRow();
-        lastColumn = sk.getColumn();
+        lastSeen = new HStoreKey(sk);
 
         // Advance the smallest key.  If that reader's all finished, then 
         // mark it as done.
-        if (!rdrs[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
-          done[smallestKey] = true;
-          rdrs[smallestKey].close();
-          rdrs[smallestKey] = null;
+        if (!rdrs[lowestKey].next(keys[lowestKey], vals[lowestKey])) {
+          done[lowestKey] = true;
+          rdrs[lowestKey].close();
+          rdrs[lowestKey] = null;
           numDone++;
         }
       }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Tue Nov 18 16:43:29 2008
@@ -219,15 +219,17 @@
    * to this regionserver are blocked.
    */
   private synchronized void flushSomeRegions() {
-    SortedMap<Long, HRegion> m =
-      this.server.getCopyOfOnlineRegionsSortedBySize();
-    if (m.size() <= 0) {
-      LOG.info("No online regions to flush though we've been asked flush some.");
-      return;
-    }
     // keep flushing until we hit the low water mark
-    while (server.getGlobalMemcacheSize() >= globalMemcacheLimitLowMark) {
+    for (SortedMap<Long, HRegion> m =
+        this.server.getCopyOfOnlineRegionsSortedBySize();
+      server.getGlobalMemcacheSize() >= globalMemcacheLimitLowMark;) {
       // flush the region with the biggest memcache
+      if (m.size() <= 0) {
+        LOG.info("No online regions to flush though we've been asked flush " +
+            "some; globalMemcacheSize=" + this.server.getGlobalMemcacheSize() +
+            ", globalMemcacheLimitLowMark=" + this.globalMemcacheLimitLowMark);
+        break;
+      }
       HRegion biggestMemcacheRegion = m.remove(m.firstKey());
       if (!flushRegion(biggestMemcacheRegion, true)) {
         // Something bad happened - give up.

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java Tue Nov 18 16:43:29 2008
@@ -45,7 +45,7 @@
 
   private static final byte[] FAMILY = Bytes.toBytes("family:");
   private static final byte[] COL_A = Bytes.toBytes("family:a");
-  private static final String INDEX_COL_A_ASC = "A-Acending";
+  private static final String INDEX_COL_A = "A";
 
   private static final int NUM_ROWS = 10;
   private static final int MAX_VAL = 10000;
@@ -70,8 +70,8 @@
     desc.addFamily(new HColumnDescriptor(FAMILY));
 
     // Create a new index that does lexicographic ordering on COL_A
-    IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A_ASC,
-        COL_A, true);
+    IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A,
+        COL_A);
     desc.addIndex(colAIndex);
 
     admin = new IndexedTableAdmin(conf);
@@ -97,7 +97,7 @@
   }
   
   private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException {
-    Scanner scanner = table.getIndexedScanner(INDEX_COL_A_ASC,
+    Scanner scanner = table.getIndexedScanner(INDEX_COL_A,
         HConstants.EMPTY_START_ROW, null, null, null);
     int numRows = 0;
     byte[] lastColA = null;

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=718822&r1=718821&r2=718822&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Nov 18 16:43:29 2008
@@ -103,34 +103,39 @@
     assertTrue(cellValues.length == 3);
     r.flushcache();
     r.compactStores();
-    // Always 3 version if that is what max versions is.
+    // Always 3 versions if that is what max versions is.
     byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
     // Increment the least significant character so we get to next row.
     secondRowBytes[START_KEY_BYTES.length - 1]++;
     cellValues = r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100/*Too many*/);
-    LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " + cellValues.length);
+    LOG.info("Count of " + Bytes.toString(secondRowBytes) + ": " +
+      cellValues.length);
     assertTrue(cellValues.length == 3);
 
     // Now add deletes to memcache and then flush it.  That will put us over
     // the compaction threshold of 3 store files.  Compacting these store files
     // should result in a compacted store file that has no references to the
     // deleted row.
-    r.deleteAll(STARTROW, COLUMN_FAMILY_TEXT, System.currentTimeMillis(), null);
+    r.deleteAll(secondRowBytes, COLUMN_FAMILY_TEXT, System.currentTimeMillis(),
+      null);
     // Assert deleted.
-    assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
+    assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
     r.flushcache();
-    assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
+    assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
     // Add a bit of data and flush.  Start adding at 'bbb'.
     createSmallerStoreFile(this.r);
     r.flushcache();
-    // Assert that the first row is still deleted.
-    cellValues = r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/);
-    assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
+    // Assert that the second row is still deleted.
+    cellValues = r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/);
+    assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
     // Force major compaction.
     r.compactStores(true);
     assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
-    assertNull(r.get(STARTROW, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
-    // Make sure the store files do have some 'aaa' keys in them.
+    assertNull(r.get(secondRowBytes, COLUMN_FAMILY_TEXT, -1, 100 /*Too many*/));
+    // Make sure the store files do have some 'aaa' keys in them -- exactly 3.
+    // Also, that compacted store files do not have any secondRowBytes because
+    // they were deleted.
+    int count = 0;
     boolean containsStartRow = false;
     for (MapFile.Reader reader: this.r.stores.
         get(Bytes.mapKey(COLUMN_FAMILY_TEXT_MINUS_COLON)).getReaders()) {
@@ -140,14 +145,16 @@
       while(reader.next(key, val)) {
         if (Bytes.equals(key.getRow(), STARTROW)) {
           containsStartRow = true;
-          break;
+          count++;
+        } else {
+          // After major compaction, should be none of these rows in compacted
+          // file.
+          assertFalse(Bytes.equals(key.getRow(), secondRowBytes));
         }
       }
-      if (containsStartRow) {
-        break;
-      }
     }
     assertTrue(containsStartRow);
+    assertTrue(count == 3);
     // Do a simple TTL test.
     final int ttlInSeconds = 1;
     for (HStore store: this.r.stores.values()) {
@@ -155,6 +162,11 @@
     }
     Thread.sleep(ttlInSeconds * 1000);
     r.compactStores(true);
+    count = count();
+    assertTrue(count == 0);
+  }
+  
+  private int count() throws IOException {
     int count = 0;
     for (MapFile.Reader reader: this.r.stores.
         get(Bytes.mapKey(COLUMN_FAMILY_TEXT_MINUS_COLON)).getReaders()) {
@@ -165,7 +177,7 @@
         count++;
       }
     }
-    assertTrue(count == 0);
+    return count;
   }
 
   private void createStoreFile(final HRegion region) throws IOException {