You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/12/08 07:54:33 UTC
svn commit: r602334 - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Author: jimk
Date: Fri Dec 7 22:54:31 2007
New Revision: 602334
URL: http://svn.apache.org/viewvc?rev=602334&view=rev
Log:
HADOOP-2350 Scanner api returns null row names, or skips row names if different column families do not have entries for some rows
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Dec 7 22:54:31 2007
@@ -64,6 +64,8 @@
HADOOP-2338 Fix NullPointerException in master server.
HADOOP-2380 REST servlet throws NPE when any value node has an empty string
(Bryan Duxbury via Stack)
+ HADOOP-2350 Scanner api returns null row names, or skips row names if
+ different column families do not have entries for some rows
IMPROVEMENTS
HADOOP-2401 Add convenience put method that takes writable
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Fri Dec 7 22:54:31 2007
@@ -332,7 +332,7 @@
HRegion root =
new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
- HInternalScannerInterface rootScanner =
+ HScannerInterface rootScanner =
root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
try {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Fri Dec 7 22:54:31 2007
@@ -1088,7 +1088,7 @@
* @return HScannerInterface
* @throws IOException
*/
- public HInternalScannerInterface getScanner(Text[] cols, Text firstRow,
+ public HScannerInterface getScanner(Text[] cols, Text firstRow,
long timestamp, RowFilterInterface filter) throws IOException {
lock.readLock().lock();
try {
@@ -1485,33 +1485,21 @@
/**
* HScanner is an iterator through a bunch of rows in an HRegion.
*/
- private class HScanner implements HInternalScannerInterface {
+ private class HScanner implements HScannerInterface {
private HInternalScannerInterface[] scanners;
- private boolean wildcardMatch = false;
- private boolean multipleMatchers = false;
+ private TreeMap<Text, byte []>[] resultSets;
+ private HStoreKey[] keys;
/** Create an HScanner with a handle on many HStores. */
@SuppressWarnings("unchecked")
HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
RowFilterInterface filter) throws IOException {
- this.scanners = new HInternalScannerInterface[stores.length];
-// Advance to the first key in each store.
-// All results will match the required column-set and scanTime.
-
+ this.scanners = new HInternalScannerInterface[stores.length];
try {
for (int i = 0; i < stores.length; i++) {
- HInternalScannerInterface scanner =
- scanners[i] =
- stores[i].getScanner(timestamp, cols, firstRow, filter);
-
- if (scanner.isWildcardScanner()) {
- this.wildcardMatch = true;
- }
- if (scanner.isMultipleMatchScanner()) {
- this.multipleMatchers = true;
- }
- }
+ scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter);
+ }
} catch(IOException e) {
for (int i = 0; i < this.scanners.length; i++) {
@@ -1521,35 +1509,100 @@
}
throw e;
}
+
+// Advance to the first key in each store.
+// All results will match the required column-set and scanTime.
+
+ this.resultSets = new TreeMap[scanners.length];
+ this.keys = new HStoreKey[scanners.length];
+ for (int i = 0; i < scanners.length; i++) {
+ keys[i] = new HStoreKey();
+ resultSets[i] = new TreeMap<Text, byte []>();
+ if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+
// As we have now successfully completed initialization, increment the
// activeScanner count.
activeScannerCount.incrementAndGet();
}
- /** @return true if the scanner is a wild card scanner */
- public boolean isWildcardScanner() {
- return wildcardMatch;
- }
-
- /** @return true if the scanner is a multiple match scanner */
- public boolean isMultipleMatchScanner() {
- return multipleMatchers;
- }
-
/** {@inheritDoc} */
public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
throws IOException {
- boolean haveResults = false;
+ boolean moreToFollow = false;
+
+ // Find the lowest-possible key.
+
+ Text chosenRow = null;
+ long chosenTimestamp = -1;
+ for (int i = 0; i < this.keys.length; i++) {
+ if (scanners[i] != null &&
+ (chosenRow == null ||
+ (keys[i].getRow().compareTo(chosenRow) < 0) ||
+ ((keys[i].getRow().compareTo(chosenRow) == 0) &&
+ (keys[i].getTimestamp() > chosenTimestamp)))) {
+ chosenRow = new Text(keys[i].getRow());
+ chosenTimestamp = keys[i].getTimestamp();
+ }
+ }
+
+ // Store the key and results for each sub-scanner. Merge them as
+ // appropriate.
+ if (chosenTimestamp >= 0) {
+ // Here we are setting the passed in key with current row+timestamp
+ key.setRow(chosenRow);
+ key.setVersion(chosenTimestamp);
+ key.setColumn(HConstants.EMPTY_TEXT);
+
+ for (int i = 0; i < scanners.length; i++) {
+ if (scanners[i] != null && keys[i].getRow().compareTo(chosenRow) == 0) {
+ // NOTE: We used to do results.putAll(resultSets[i]);
+ // but this had the effect of overwriting newer
+ // values with older ones. So now we only insert
+ // a result if the map does not contain the key.
+ for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
+ if (!results.containsKey(e.getKey())) {
+ results.put(e.getKey(), e.getValue());
+ }
+ }
+ resultSets[i].clear();
+ if (!scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+ }
+
for (int i = 0; i < scanners.length; i++) {
- if (scanners[i] != null) {
- if (scanners[i].next(key, results)) {
- haveResults = true;
- } else {
+ // If the current scanner is non-null AND has a lower-or-equal
+ // row label, then its timestamp is bad. We need to advance it.
+ while ((scanners[i] != null) &&
+ (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+
+ resultSets[i].clear();
+ if (!scanners[i].next(keys[i], resultSets[i])) {
+ closeScanner(i);
+ }
+ }
+ }
+
+ moreToFollow = chosenTimestamp >= 0;
+ if (results == null || results.size() <= 0) {
+ // If we got no results, then there is no more to follow.
+ moreToFollow = false;
+ }
+
+ // Make sure scanners closed if no more results
+ if (!moreToFollow) {
+ for (int i = 0; i < scanners.length; i++) {
+ if (null != scanners[i]) {
closeScanner(i);
}
}
}
- return haveResults;
+ return moreToFollow;
}
@@ -1563,6 +1616,8 @@
}
} finally {
scanners[i] = null;
+ resultSets[i] = null;
+ keys[i] = null;
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Dec 7 22:54:31 2007
@@ -1373,7 +1373,7 @@
requestCount.incrementAndGet();
try {
String scannerName = String.valueOf(scannerId);
- HInternalScannerInterface s = scanners.get(scannerName);
+ HScannerInterface s = scanners.get(scannerName);
if (s == null) {
throw new UnknownScannerException("Name: " + scannerName);
}
@@ -1433,7 +1433,7 @@
try {
HRegion r = getRegion(regionName);
long scannerId = -1L;
- HInternalScannerInterface s =
+ HScannerInterface s =
r.getScanner(cols, firstRow, timestamp, filter);
scannerId = rand.nextLong();
String scannerName = String.valueOf(scannerId);
@@ -1457,7 +1457,7 @@
requestCount.incrementAndGet();
try {
String scannerName = String.valueOf(scannerId);
- HInternalScannerInterface s = null;
+ HScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(scannerName);
}
@@ -1472,9 +1472,8 @@
}
}
- Map<String, HInternalScannerInterface> scanners =
- Collections.synchronizedMap(new HashMap<String,
- HInternalScannerInterface>());
+ Map<String, HScannerInterface> scanners =
+ Collections.synchronizedMap(new HashMap<String, HScannerInterface>());
/**
* Instantiated as a scanner lease.
@@ -1490,7 +1489,7 @@
/** {@inheritDoc} */
public void leaseExpired() {
LOG.info("Scanner " + this.scannerName + " lease expired");
- HInternalScannerInterface s = null;
+ HScannerInterface s = null;
synchronized(scanners) {
s = scanners.remove(this.scannerName);
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java Fri Dec 7 22:54:31 2007
@@ -283,7 +283,7 @@
startTime = System.currentTimeMillis();
- HInternalScannerInterface s =
+ HScannerInterface s =
r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
int numFetched = 0;
try {
@@ -630,7 +630,7 @@
long startTime = System.currentTimeMillis();
- HInternalScannerInterface s =
+ HScannerInterface s =
r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java Fri Dec 7 22:54:31 2007
@@ -69,7 +69,7 @@
private void scan(boolean validateStartcode, String serverName)
throws IOException {
- HInternalScannerInterface scanner = null;
+ HScannerInterface scanner = null;
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
HStoreKey key = new HStoreKey();
@@ -108,7 +108,7 @@
}
} finally {
- HInternalScannerInterface s = scanner;
+ HScannerInterface s = scanner;
scanner = null;
if(s != null) {
s.close();
Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java?rev=602334&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScannerAPI.java Fri Dec 7 22:54:31 2007
@@ -0,0 +1,161 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+
+/** test the scanner API at all levels */
+public class TestScannerAPI extends HBaseClusterTestCase {
+ private final Text[] columns = new Text[] {
+ new Text("a:"),
+ new Text("b:")
+ };
+ private final Text startRow = new Text("0");
+
+ private final TreeMap<Text, SortedMap<Text, byte[]>> values =
+ new TreeMap<Text, SortedMap<Text, byte[]>>();
+
+ /**
+ * @throws Exception
+ */
+ public TestScannerAPI() throws Exception {
+ super();
+ try {
+ TreeMap<Text, byte[]> columns = new TreeMap<Text, byte[]>();
+ columns.put(new Text("a:1"), "1".getBytes(HConstants.UTF8_ENCODING));
+ values.put(new Text("1"), columns);
+ columns = new TreeMap<Text, byte[]>();
+ columns.put(new Text("a:2"), "2".getBytes(HConstants.UTF8_ENCODING));
+ columns.put(new Text("b:2"), "2".getBytes(HConstants.UTF8_ENCODING));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testApi() throws IOException {
+ final String tableName = getName();
+
+ // Create table
+
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+ for (int i = 0; i < columns.length; i++) {
+ tableDesc.addFamily(new HColumnDescriptor(columns[i].toString()));
+ }
+ admin.createTable(tableDesc);
+
+ // Insert values
+
+ HTable table = new HTable(conf, new Text(getName()));
+
+ for (Map.Entry<Text, SortedMap<Text, byte[]>> row: values.entrySet()) {
+ long lockid = table.startUpdate(row.getKey());
+ for (Map.Entry<Text, byte[]> val: row.getValue().entrySet()) {
+ table.put(lockid, val.getKey(), val.getValue());
+ }
+ table.commit(lockid);
+ }
+
+ HRegion region = null;
+ try {
+ SortedMap<Text, HRegion> regions =
+ cluster.getRegionThreads().get(0).getRegionServer().getOnlineRegions();
+ for (Map.Entry<Text, HRegion> e: regions.entrySet()) {
+ if (!e.getValue().getRegionInfo().isMetaRegion()) {
+ region = e.getValue();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ IOException iox = new IOException("error finding region");
+ iox.initCause(e);
+ throw iox;
+ }
+ @SuppressWarnings("null")
+ HScannerInterface scanner =
+ region.getScanner(columns, startRow, System.currentTimeMillis(), null);
+ try {
+ verify(scanner);
+ } finally {
+ scanner.close();
+ }
+
+ scanner = table.obtainScanner(columns, startRow);
+ try {
+ verify(scanner);
+ } finally {
+ scanner.close();
+ }
+ scanner = table.obtainScanner(columns, startRow);
+ try {
+ for (Iterator<Map.Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator =
+ scanner.iterator();
+ iterator.hasNext();
+ ) {
+ Map.Entry<HStoreKey, SortedMap<Text, byte[]>> row = iterator.next();
+ HStoreKey key = row.getKey();
+ assertTrue("row key", values.containsKey(key.getRow()));
+
+ SortedMap<Text, byte[]> results = row.getValue();
+ SortedMap<Text, byte[]> columnValues = values.get(key.getRow());
+ assertEquals(columnValues.size(), results.size());
+ for (Map.Entry<Text, byte[]> e: columnValues.entrySet()) {
+ Text column = e.getKey();
+ assertTrue("column", results.containsKey(column));
+ assertTrue("value", Arrays.equals(columnValues.get(column),
+ results.get(column)));
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+
+ private void verify(HScannerInterface scanner) throws IOException {
+ HStoreKey key = new HStoreKey();
+ SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ while (scanner.next(key, results)) {
+ Text row = key.getRow();
+ assertTrue("row key", values.containsKey(row));
+
+ SortedMap<Text, byte[]> columnValues = values.get(row);
+ assertEquals(columnValues.size(), results.size());
+ for (Map.Entry<Text, byte[]> e: columnValues.entrySet()) {
+ Text column = e.getKey();
+ assertTrue("column", results.containsKey(column));
+ assertTrue("value", Arrays.equals(columnValues.get(column),
+ results.get(column)));
+ }
+ results.clear();
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java?rev=602334&r1=602333&r2=602334&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java Fri Dec 7 22:54:31 2007
@@ -228,7 +228,7 @@
final Text firstValue)
throws IOException {
Text [] cols = new Text[] {new Text(column)};
- HInternalScannerInterface s = r.getScanner(cols,
+ HScannerInterface s = r.getScanner(cols,
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
try {
HStoreKey curKey = new HStoreKey();