You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/01/26 23:10:22 UTC
svn commit: r1236411 - in /incubator/accumulo/branches/1.4: docs/
src/server/src/main/java/org/apache/accumulo/server/iterators/
src/server/src/main/java/org/apache/accumulo/server/master/
src/server/src/main/java/org/apache/accumulo/server/tabletserve...
Author: kturner
Date: Thu Jan 26 22:10:21 2012
New Revision: 1236411
URL: http://svn.apache.org/viewvc?rev=1236411&view=rev
Log:
ACCUMULO-334 Made splits copy bulk load flags. Added metadata iterator to delete inactive bulk load flags. Made bulk RW test check all markers are present only once.
Added:
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
Modified:
incubator/accumulo/branches/1.4/docs/config.html
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
Modified: incubator/accumulo/branches/1.4/docs/config.html
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/docs/config.html?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/docs/config.html (original)
+++ incubator/accumulo/branches/1.4/docs/config.html Thu Jan 26 22:10:21 2012
@@ -521,62 +521,69 @@ $HADOOP_HOME/lib/[^.].*.jar,
<td>determines if logs are archived in hdfs</td>
</tr>
<tr >
+ <td>logger.archive.replication</td>
+ <td><b><a href='#COUNT'>count</a></b></td>
+ <td>yes</td>
+ <td><pre>0</pre></td>
+ <td>determines the replication factor for walogs archived in hdfs, set to zero to use default</td>
+ </tr>
+ <tr class='highlight'>
<td>logger.copy.threadpool.size</td>
<td><b><a href='#COUNT'>count</a></b></td>
<td>yes</td>
<td><pre>2</pre></td>
<td>size of the thread pool used to copy files from the local log area to HDFS</td>
</tr>
- <tr class='highlight'>
+ <tr >
<td>logger.dir.walog</td>
<td><b><a href='#PATH'>path</a></b></td>
<td>yes</td>
<td><pre>walogs</pre></td>
<td>The directory used to store write-ahead logs on the local filesystem. It is possible to specify a comma-separated list of directories.</td>
</tr>
- <tr >
+ <tr class='highlight'>
<td>logger.monitor.fs</td>
<td><b><a href='#BOOLEAN'>boolean</a></b></td>
<td>yes</td>
<td><pre>true</pre></td>
<td>When enabled the logger will monitor file systems and kill itself when one switches from rw to ro. This is usually and indication that Linux has detected a bad disk.</td>
</tr>
- <tr class='highlight'>
+ <tr >
<td>logger.port.client</td>
<td><b><a href='#PORT'>port</a></b></td>
<td>yes but requires restart</td>
<td><pre>11224</pre></td>
<td>The port used for write-ahead logger services</td>
</tr>
- <tr >
+ <tr class='highlight'>
<td>logger.port.search</td>
<td><b><a href='#BOOLEAN'>boolean</a></b></td>
<td>yes</td>
<td><pre>false</pre></td>
<td>if the port above is in use, search higher ports until one is available</td>
</tr>
- <tr class='highlight'>
+ <tr >
<td>logger.recovery.file.replication</td>
<td><b><a href='#COUNT'>count</a></b></td>
<td>yes</td>
<td><pre>1</pre></td>
<td>When a logger puts a WALOG into HDFS, it will use this as the replication factor.</td>
</tr>
- <tr >
+ <tr class='highlight'>
<td>logger.server.threadcheck.time</td>
<td><b><a href='#TIMEDURATION'>duration</a></b></td>
<td>yes</td>
<td><pre>1s</pre></td>
<td>The time between adjustments of the server thread pool.</td>
</tr>
- <tr class='highlight'>
+ <tr >
<td>logger.server.threads.minimum</td>
<td><b><a href='#COUNT'>count</a></b></td>
<td>yes</td>
<td><pre>2</pre></td>
<td>The miniumum number of threads to use to handle incoming requests.</td>
</tr>
- <tr >
+ <tr class='highlight'>
<td>logger.sort.buffer.size</td>
<td><b><a href='#MEMORY'>memory</a></b></td>
<td>yes</td>
Added: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java?rev=1236411&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java (added)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java Thu Jan 26 22:10:21 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+
+/**
+ * A special iterator for the metadata table that removes inactive bulk load flags
+ *
+ */
+public class MetadataBulkLoadFilter extends Filter {
+
+ enum Status {
+ ACTIVE, INACTIVE
+ }
+
+ Map<Long,Status> bulkTxStatusCache;
+ ZooArbitrator arbitrator;
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ if (!k.isDeleted() && k.compareColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY) == 0) {
+ long txid = Long.valueOf(v.toString());
+
+ Status status = bulkTxStatusCache.get(txid);
+ if (status == null) {
+ try {
+ if (arbitrator.transactionAlive(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+ status = Status.ACTIVE;
+ } else {
+ status = Status.INACTIVE;
+ }
+ } catch (Exception e) {
+ // TODO log
+ status = Status.ACTIVE;
+ }
+
+ bulkTxStatusCache.put(txid, status);
+ }
+
+ return status == Status.ACTIVE;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+
+ if (env.getIteratorScope() == IteratorScope.scan) {
+ throw new IOException("This iterator not intended for use at scan time");
+ }
+
+ bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
+ arbitrator = new ZooArbitrator();
+ }
+}
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Thu Jan 26 22:10:21 2012
@@ -93,6 +93,7 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.fate.Fate;
import org.apache.accumulo.server.fate.TStore.TStatus;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete;
import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
@@ -328,6 +329,9 @@ public class Master implements LiveTServ
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ TablePropUtil.setTableProperty(Constants.METADATA_TABLE_ID, Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20,"
+ + MetadataBulkLoadFilter.class.getName());
+
zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.SKIP);
zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.SKIP);
zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.SKIP);
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu Jan 26 22:10:21 2012
@@ -3434,9 +3434,14 @@ public class Tablet {
String time = tabletTime.getMetadataValue();
+ // it is possible that some of the bulk loading flags will be deleted after being read below because the bulk load
+ // finishes.... therefore split could propogate load flags for a finished bulk load... there is a special iterator
+ // on the !METADATA table to clean up this type of garbage
+ Map<String,Long> bulkLoadedFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
+
MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(), tabletServer.getLock());
- MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, SecurityConstants.getSystemCredentials(), time,
- lastFlushID, lastCompactID, tabletServer.getLock());
+ MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, bulkLoadedFiles,
+ SecurityConstants.getSystemCredentials(), time, lastFlushID, lastCompactID, tabletServer.getLock());
MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(), tabletServer.getLock());
log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Thu Jan 26 22:10:21 2012
@@ -164,9 +164,11 @@ public class SplitRecoveryTest extends F
m.put(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, assignment.server.asColumnQualifier(), assignment.server.asMutationValue());
writer.update(m);
- if (steps >= 1)
- MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID + "0", -1l,
- -1l, zl);
+ if (steps >= 1) {
+ Map<String,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
+ MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, bulkFiles, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID
+ + "0", -1l, -1l, zl);
+ }
if (steps >= 2)
MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(), zl);
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java Thu Jan 26 22:10:21 2012
@@ -54,8 +54,10 @@ public class BulkPlusOne extends BulkTes
}
}
public static final Text MARKER_CF = new Text("marker");
- private static final AtomicLong counter = new AtomicLong();
+ static final AtomicLong counter = new AtomicLong();
+ private static final Value ONE = new Value("1".getBytes());
+
static void bulkLoadLots(Logger log, State state, Value value) throws Exception {
final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
final Path fail = new Path(dir.toString() + "_fail");
@@ -74,8 +76,8 @@ public class BulkPlusOne extends BulkTes
for (Integer row : startRows)
printRows.add(String.format(FMT, row));
- String markerColumnFamily = Long.toString(counter.incrementAndGet());
- log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnFamily);
+ String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+ log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
List<Integer> rows = new ArrayList<Integer>(startRows);
rows.add(LOTS);
@@ -91,7 +93,7 @@ public class BulkPlusOne extends BulkTes
for (Column col : COLNAMES) {
f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value);
}
- f.append(new Key(row, MARKER_CF, new Text(markerColumnFamily)), value);
+ f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
}
f.close();
}
@@ -101,7 +103,7 @@ public class BulkPlusOne extends BulkTes
FileStatus[] failures = fs.listStatus(fail);
if (failures != null && failures.length > 0)
throw new Exception("Failures " + Arrays.asList(failures) + " found importing files from " + dir);
- log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnFamily);
+ log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
}
@Override
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Thu Jan 26 22:10:21 2012
@@ -57,7 +57,7 @@ public class Setup extends Test {
tableOps.create(getTableName());
IteratorSetting is = new IteratorSetting(10, org.apache.accumulo.core.iterators.user.SummingCombiner.class);
SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
- SummingCombiner.setColumns(is, BulkPlusOne.COLNAMES);
+ SummingCombiner.setCombineAllColumns(is, true);
tableOps.attachIterator(getTableName(), is);
}
} catch (TableExistsException ex) {
@@ -65,6 +65,7 @@ public class Setup extends Test {
}
state.set("rand", rand);
state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
+ BulkPlusOne.counter.set(0l);
BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
ThreadFactory factory = new ThreadFactory() {
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java Thu Jan 26 22:10:21 2012
@@ -17,6 +17,7 @@
package org.apache.accumulo.server.test.randomwalk.bulk;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
@@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -58,6 +60,37 @@ public class Verify extends Test {
throw new Exception("Bad key at " + entry);
}
}
+
+ scanner.clearColumns();
+ scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF);
+ RowIterator rowIter = new RowIterator(scanner);
+
+ while (rowIter.hasNext()) {
+ Iterator<Entry<Key,Value>> row = rowIter.next();
+ long prev = 0;
+ Text rowText = null;
+ while (row.hasNext()) {
+ Entry<Key,Value> entry = row.next();
+
+ if (rowText == null)
+ rowText = entry.getKey().getRow();
+
+ long curr = Long.valueOf(entry.getKey().getColumnQualifier().toString());
+
+ if (curr - 1 != prev)
+ throw new Exception("Bad marker count " + entry.getKey() + " " + entry.getValue() + " " + prev);
+
+ if (!entry.getValue().toString().equals("1"))
+ throw new Exception("Bad marker value " + entry.getKey() + " " + entry.getValue());
+
+ prev = curr;
+ }
+
+ if (BulkPlusOne.counter.get() != prev) {
+ throw new Exception("Row " + rowText + " does not have all markers " + BulkPlusOne.counter.get() + " " + prev);
+ }
+ }
+
log.info("Test successful on table " + Setup.getTableName());
state.getConnector().tableOperations().delete(Setup.getTableName());
}
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Thu Jan 26 22:10:21 2012
@@ -46,6 +46,7 @@ import org.apache.accumulo.server.Server
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.ZKAuthenticator;
@@ -92,6 +93,7 @@ public class Initialize {
initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1");
initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName());
initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1");
+ initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
String.format("%s,%s", Constants.METADATA_TABLET_COLUMN_FAMILY.toString(), Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.toString()));
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Thu Jan 26 22:10:21 2012
@@ -417,8 +417,8 @@ public class MetadataTable extends org.a
return sizes;
}
- public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<String,DataFileValue> datafileSizes, AuthInfo credentials,
- String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
+ public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<String,DataFileValue> datafileSizes,
+ Map<String,Long> bulkLoadedFiles, AuthInfo credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
Mutation m = extent.getPrevRowUpdateMutation();
ColumnFQ.put(m, Constants.METADATA_DIRECTORY_COLUMN, new Value(path.getBytes()));
@@ -437,6 +437,11 @@ public class MetadataTable extends org.a
m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
}
+ for (Entry<String,Long> entry : bulkLoadedFiles.entrySet()) {
+ byte[] tidBytes = Long.toString(entry.getValue()).getBytes();
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(tidBytes));
+ }
+
update(credentials, zooLock, m);
}
@@ -594,7 +599,8 @@ public class MetadataTable extends org.a
if (!scanner2.iterator().hasNext()) {
log.debug("Prev tablet " + prevRowKey + " does not exist, need to create it " + metadataPrevEndRow + " " + prevPrevEndRow + " " + splitRatio);
- MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, credentials, time, initFlushID, initCompactID, lock);
+ Map<String,Long> bulkFiles = getBulkFilesLoaded(credentials, metadataEntry);
+ MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, bulkFiles, credentials, time, initFlushID, initCompactID, lock);
} else {
log.debug("Prev tablet " + prevRowKey + " exist, do not need to add it");
}
@@ -1245,6 +1251,26 @@ public class MetadataTable extends org.a
}
}
+ public static Map<String,Long> getBulkFilesLoaded(AuthInfo credentials, KeyExtent extent) {
+ return getBulkFilesLoaded(credentials, extent.getMetadataEntry());
+ }
+
+ public static Map<String,Long> getBulkFilesLoaded(AuthInfo credentials, Text metadataRow) {
+
+ Map<String,Long> ret = new HashMap<String,Long>();
+
+ Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
+ scanner.setRange(new Range(metadataRow));
+ scanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+ for (Entry<Key,Value> entry : scanner) {
+ String file = entry.getKey().getColumnQualifier().toString();
+ Long tid = Long.parseLong(entry.getValue().toString());
+
+ ret.put(file, tid);
+ }
+ return ret;
+ }
+
public static void addBulkLoadInProgressFlag(String path) {
Mutation m = new Mutation(Constants.METADATA_BLIP_FLAG_PREFIX + path);