You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/10/10 01:48:46 UTC
svn commit: r583323 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/dfs/FSEditLog.java
src/java/org/apache/hadoop/dfs/NameNode.java
src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
src/test/org/apache/hadoop/dfs/TestEditLog.java
Author: dhruba
Date: Tue Oct 9 16:48:46 2007
New Revision: 583323
URL: http://svn.apache.org/viewvc?rev=583323&view=rev
Log:
HADOOP-1942. Increase the concurrency of transaction logging to
edits log. Reduce the number of syncs by double-buffering the changes
to the transaction log. (Dhruba Borthakur)
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java (with props)
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (with props)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583323&r1=583322&r2=583323&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 9 16:48:46 2007
@@ -362,6 +362,10 @@
HADOOP-1971. Warn when job does not specify a jar. (enis via cutting)
+ HADOOP-1942. Increase the concurrency of transaction logging to
+ edits log. Reduce the number of syncs by double-buffering the changes
+ to the transaction log. (Dhruba Borthakur)
+
Release 0.14.2 - 2007-10-09
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=583323&r1=583322&r2=583323&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue Oct 9 16:48:46 2007
@@ -20,6 +20,8 @@
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -27,6 +29,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.lang.Math;
+import java.nio.channels.FileChannel;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.UTF8;
@@ -45,33 +48,167 @@
//the following two are used only for backword compatibility :
@Deprecated private static final byte OP_DATANODE_ADD = 5;
@Deprecated private static final byte OP_DATANODE_REMOVE = 6;
+ private static int sizeFlushBuffer = 512*1024;
private ArrayList<EditLogOutputStream> editStreams = null;
private FSImage fsimage = null;
- private long lastModificationTime;
- private long lastSyncTime;
-
- static class EditLogOutputStream extends DataOutputStream {
+ // a monotonically increasing counter that represents transactionIds.
+ private long txid = 0;
+
+ // stores the last synced transactionId.
+ private long synctxid = 0;
+
+ // the time of printing the statistics to the log file.
+ private long lastPrintTime;
+
+ // is a sync currently running?
+ private boolean isSyncRunning;
+
+ // these are statistics counters.
+ private long numTransactions; // number of transactions
+ private long totalTimeTransactions; // total time for all transactions
+ private NameNodeMetrics metrics;
+
+ private static class TransactionId {
+ public long txid;
+
+ TransactionId(long value) {
+ this.txid = value;
+ }
+ }
+
+ // stores the most current transactionId of this thread.
+ private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
+ protected synchronized TransactionId initialValue() {
+ return new TransactionId(Long.MAX_VALUE);
+ }
+ };
+
+ static class EditLogOutputStream {
+ private FileChannel fc;
+ private FileOutputStream fp;
+ private DataOutputStream od;
+ private DataOutputStream od1;
+ private DataOutputStream od2;
+ private ByteArrayOutputStream buf1;
+ private ByteArrayOutputStream buf2;
+ private int bufSize;
+
+ // these are statistics counters
+ private long numSync; // number of syncs to disk
+ private long totalTimeSync; // total time to sync
+
EditLogOutputStream(File name) throws IOException {
- super(new FileOutputStream(name, true)); // open for append
+ bufSize = sizeFlushBuffer;
+ buf1 = new ByteArrayOutputStream(bufSize);
+ buf2 = new ByteArrayOutputStream(bufSize);
+ od1 = new DataOutputStream(buf1);
+ od2 = new DataOutputStream(buf2);
+ od = od1; // start with first buffer
+ fp = new FileOutputStream(name, true); // open for append
+ fc = fp.getChannel();
+ numSync = totalTimeSync = 0;
+ }
+
+ // returns the current output stream
+ DataOutputStream getOutputStream() {
+ return od;
}
void flushAndSync() throws IOException {
- ((FileOutputStream)out).getChannel().force(true);
+ this.flush();
+ fc.force(true);
}
void create() throws IOException {
- ((FileOutputStream)out).getChannel().truncate(0);
- writeInt(FSConstants.LAYOUT_VERSION);
+ fc.truncate(0);
+ od.writeInt(FSConstants.LAYOUT_VERSION);
flushAndSync();
}
+
+ // flush current buffer
+ private void flush() throws IOException {
+ ByteArrayOutputStream buf = getBuffer();
+ if (buf.size() == 0) {
+ return; // no data to flush
+ }
+ buf.writeTo(fp); // write data to file
+ buf.reset(); // erase all data in buf
+ }
+
+ void close() throws IOException {
+ // close should have been called after all pending transactions
+ // have been flushed & synced.
+ if (getBufSize() != 0) {
+ throw new IOException("FSEditStream has " + getBufSize() +
+ " bytes still to be flushed and cannot " +
+ "closed.");
+ }
+ od.close();
+ fp.close();
+ buf1 = buf2 = null;
+ od = od1 = od2 = null;
+ }
+
+ // returns the amount of data in the buffer
+ int getBufSize() {
+ return getBuffer().size();
+ }
+
+ // get the current buffer
+ private ByteArrayOutputStream getBuffer() {
+ if (od == od1) {
+ return buf1;
+ } else {
+ return buf2;
+ }
+ }
+
+ //
+ // Flush current buffer to output stream, swap buffers
+ // This is protected by the flushLock.
+ //
+ void swap() {
+ if (od == od1) {
+ od = od2;
+ } else {
+ od = od1;
+ }
+ }
+
+ //
+ // Flush old buffer to persistent store
+ //
+ void flushAndSyncOld() throws IOException {
+ numSync++;
+ ByteArrayOutputStream oldbuf;
+ if (od == od1) {
+ oldbuf = buf2;
+ } else {
+ oldbuf = buf1;
+ }
+ long start = FSNamesystem.now();
+ oldbuf.writeTo(fp); // write data to file
+ oldbuf.reset(); // erase all data in buf
+ fc.force(true); // sync to persistent store
+ long end = FSNamesystem.now();
+ totalTimeSync += (end - start);
+ }
+
+ long getTotalSyncTime() {
+ return totalTimeSync;
+ }
+
+ long getNumSync() {
+ return numSync;
+ }
}
FSEditLog(FSImage image) {
fsimage = image;
- lastModificationTime = 0;
- lastSyncTime = 0;
+ isSyncRunning = false;
+ metrics = NameNode.getNameNodeMetrics();
}
private File getEditFile(int idx) {
@@ -101,6 +238,7 @@
* @throws IOException
*/
synchronized void open() throws IOException {
+ numTransactions = totalTimeTransactions = 0;
int size = getNumStorageDirs();
if (editStreams == null)
editStreams = new ArrayList<EditLogOutputStream>(size);
@@ -138,9 +276,18 @@
* Shutdown the filestore
*/
synchronized void close() throws IOException {
+ while (isSyncRunning) {
+ try {
+ wait(1000);
+ } catch (InterruptedException ie) {
+ }
+ }
if (editStreams == null) {
return;
}
+ printStatistics(true);
+ numTransactions = totalTimeTransactions = 0;
+
for (int idx = 0; idx < editStreams.size(); idx++) {
EditLogOutputStream eStream = editStreams.get(idx);
try {
@@ -175,6 +322,38 @@
}
/**
+ * The specified streams have IO errors. Remove them from logging
+ * new transactions.
+ */
+ private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
+ if (errorStreams == null) {
+ return; // nothing to do
+ }
+ for (int idx = 0; idx < errorStreams.size(); idx++) {
+ EditLogOutputStream eStream = errorStreams.get(idx);
+ int j = 0;
+ for (j = 0; j < editStreams.size(); j++) {
+ if (editStreams.get(j) == eStream) {
+ break;
+ }
+ }
+ if (j == editStreams.size()) {
+ FSNamesystem.LOG.error("Unable to find sync log on which " +
+ " IO error occured. " +
+ "Fatal Error.");
+ Runtime.getRuntime().exit(-1);
+ }
+ try {
+ processIOError(idx);
+ } catch (IOException e) {
+ FSNamesystem.LOG.error("Unable to sync edit log. " +
+ "Fatal Error.");
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+ }
+
+ /**
* check if ANY edits.new log exists
*/
boolean existsNew() throws IOException {
@@ -425,65 +604,140 @@
*/
synchronized void logEdit(byte op, Writable w1, Writable w2) {
assert this.getNumEditStreams() > 0 : "no editlog streams";
+ long start = FSNamesystem.now();
for (int idx = 0; idx < editStreams.size(); idx++) {
- EditLogOutputStream eStream;
- synchronized (eStream = editStreams.get(idx)) {
+ EditLogOutputStream eStream = editStreams.get(idx);
+ try {
+ DataOutputStream od = eStream.getOutputStream();
+ od.write(op);
+ if (w1 != null) {
+ w1.write(od);
+ }
+ if (w2 != null) {
+ w2.write(od);
+ }
+ } catch (IOException ie) {
try {
- eStream.write(op);
- if (w1 != null) {
- w1.write(eStream);
- }
- if (w2 != null) {
- w2.write(eStream);
- }
- } catch (IOException ie) {
- try {
- processIOError(idx);
- } catch (IOException e) {
- FSNamesystem.LOG.error("Unable to append to edit log. " +
- "Fatal Error.");
- Runtime.getRuntime().exit(-1);
- }
+ processIOError(idx);
+ } catch (IOException e) {
+ FSNamesystem.LOG.error("Unable to append to edit log. " +
+ "Fatal Error.");
+ Runtime.getRuntime().exit(-1);
}
}
}
+ // get a new transactionId
+ txid++;
+
//
- // record the time when new data was written to the edits log
+ // record the transactionId when new data was written to the edits log
//
- lastModificationTime = System.currentTimeMillis();
+ TransactionId id = (TransactionId)myTransactionId.get();
+ id.txid = txid;
+
+ // update statistics
+ long end = FSNamesystem.now();
+ numTransactions++;
+ totalTimeTransactions += (end-start);
+ metrics.incrNumTransactions(1, (int)(end-start));
}
//
- // flush all data of the Edits log into persistent store
+ // Sync all modifications done by this thread.
//
- synchronized void logSync() {
- assert this.getNumEditStreams() > 0 : "no editlog streams";
+ void logSync() {
+ ArrayList<EditLogOutputStream> errorStreams = null;
+ long syncStart = 0;
+
+ // Fetch the transactionId of this thread.
+ TransactionId id = (TransactionId)myTransactionId.get();
+ long mytxid = id.txid;
+
+ synchronized (this) {
+ assert this.getNumEditStreams() > 0 : "no editlog streams";
+ printStatistics(false);
- //
- // If data was generated before the beginning of the last sync time
- // then there is nothing to flush
- //
- if (lastModificationTime < lastSyncTime) {
- return;
+ // if somebody is already syncing, then wait
+ while (mytxid > synctxid && isSyncRunning) {
+ try {
+ wait(1000);
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ //
+ // If this transaction was already flushed, then nothing to do
+ //
+ if (mytxid <= synctxid) {
+ return;
+ }
+
+ // now, this thread will do the sync
+ syncStart = txid;
+ isSyncRunning = true;
+
+ // swap buffers
+ for (int idx = 0; idx < editStreams.size(); idx++) {
+ EditLogOutputStream eStream = editStreams.get(idx);
+ eStream.swap();
+ }
}
- lastSyncTime = System.currentTimeMillis();
+ // do the sync
+ long start = FSNamesystem.now();
for (int idx = 0; idx < editStreams.size(); idx++) {
- EditLogOutputStream eStream;
- synchronized (eStream = editStreams.get(idx)) {
- try {
- eStream.flushAndSync();
- } catch (IOException ie) {
- try {
- processIOError(idx);
- } catch (IOException e) {
- FSNamesystem.LOG.error("Unable to sync edit log. " +
- "Fatal Error.");
- Runtime.getRuntime().exit(-1);
- }
+ EditLogOutputStream eStream = editStreams.get(idx);
+ try {
+ eStream.flushAndSyncOld();
+ } catch (IOException ie) {
+ //
+ // remember the streams that encountered an error.
+ //
+ if (errorStreams == null) {
+ errorStreams = new ArrayList<EditLogOutputStream>(1);
}
+ errorStreams.add(eStream);
+ FSNamesystem.LOG.error("Unable to sync edit log. " +
+ "Fatal Error.");
}
}
+ long elapsed = FSNamesystem.now() - start;
+
+ synchronized (this) {
+ processIOError(errorStreams);
+ synctxid = syncStart;
+ isSyncRunning = false;
+ this.notifyAll();
+ }
+
+ metrics.incrSyncs(1, (int)elapsed);
+ }
+
+ //
+ // print statistics every 1 minute.
+ //
+ private void printStatistics(boolean force) {
+ long now = FSNamesystem.now();
+ if (lastPrintTime + 60000 < now && !force) {
+ return;
+ }
+ if (editStreams == null) {
+ return;
+ }
+ lastPrintTime = now;
+ StringBuffer buf = new StringBuffer();
+
+ buf.append("Number of transactions: " + numTransactions +
+ " Total time for transactions(ms): " +
+ totalTimeTransactions);
+ buf.append(" Number of syncs: " + editStreams.get(0).getNumSync());
+ buf.append(" SyncTimes(ms): ");
+ for (int idx = 0; idx < editStreams.size(); idx++) {
+ EditLogOutputStream eStream = editStreams.get(idx);
+ buf.append(eStream.getTotalSyncTime());
+ buf.append(" ");
+ }
+ FSNamesystem.LOG.info(buf);
}
/**
@@ -561,10 +815,10 @@
assert(getNumStorageDirs() == editStreams.size());
long size = 0;
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- synchronized (editStreams.get(idx)) {
- assert(size == 0 || size == getEditFile(idx).length());
- size = getEditFile(idx).length();
- }
+ EditLogOutputStream eStream = editStreams.get(idx);
+ assert(size == 0 ||
+ size == getEditFile(idx).length() + eStream.getBufSize());
+ size = getEditFile(idx).length() + eStream.getBufSize();
}
return size;
}
@@ -653,5 +907,10 @@
*/
synchronized long getFsEditTime() throws IOException {
return getEditFile(0).lastModified();
+ }
+
+ // sets the initial capacity of the flush buffer.
+ static void setBufferCapacity(int size) {
+ sizeFlushBuffer = size;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=583323&r1=583322&r2=583323&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Oct 9 16:48:46 2007
@@ -31,12 +31,6 @@
import java.util.Collection;
import java.util.Iterator;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.metrics.jvm.JvmMetrics;
-
/**********************************************************
* NameNode serves as both directory namespace manager and
* "inode table" for the Hadoop DFS. There is a single NameNode
@@ -105,62 +99,11 @@
format(conf, false);
}
- private static class NameNodeMetrics implements Updater {
- private final MetricsRecord metricsRecord;
-
- private int numFilesCreated = 0;
- private int numFilesOpened = 0;
- private int numFilesRenamed = 0;
- private int numFilesListed = 0;
-
- NameNodeMetrics(Configuration conf) {
- String sessionId = conf.get("session.id");
- // Initiate Java VM metrics
- JvmMetrics.init("NameNode", sessionId);
- // Create a record for NameNode metrics
- MetricsContext metricsContext = MetricsUtil.getContext("dfs");
- metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
- metricsRecord.setTag("sessionId", sessionId);
- metricsContext.registerUpdater(this);
- }
-
- /**
- * Since this object is a registered updater, this method will be called
- * periodically, e.g. every 5 seconds.
- */
- public void doUpdates(MetricsContext unused) {
- synchronized (this) {
- metricsRecord.incrMetric("files_created", numFilesCreated);
- metricsRecord.incrMetric("files_opened", numFilesOpened);
- metricsRecord.incrMetric("files_renamed", numFilesRenamed);
- metricsRecord.incrMetric("files_listed", numFilesListed);
-
- numFilesCreated = 0;
- numFilesOpened = 0;
- numFilesRenamed = 0;
- numFilesListed = 0;
- }
- metricsRecord.update();
- }
-
- synchronized void createFile() {
- ++numFilesCreated;
- }
-
- synchronized void openFile() {
- ++numFilesOpened;
- }
-
- synchronized void renameFile() {
- ++numFilesRenamed;
- }
-
- synchronized void listFile(int nfiles) {
- numFilesListed += nfiles;
- }
+ static NameNodeMetrics myMetrics;
+
+ public static NameNodeMetrics getNameNodeMetrics() {
+ return myMetrics;
}
-
- private NameNodeMetrics myMetrics;
/**
* Initialize the server
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java?rev=583323&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java Tue Oct 9 16:48:46 2007
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+class NameNodeMetrics implements Updater {
+ private final MetricsRecord metricsRecord;
+
+ private int numFilesCreated = 0;
+ private int numFilesOpened = 0;
+ private int numFilesRenamed = 0;
+ private int numFilesListed = 0;
+
+ private int numTransactions = 0;
+ private int totalTimeTransactionsLogMemory = 0;
+ private int numSyncs = 0;
+ private int totalTimeSyncs = 0;
+
+ NameNodeMetrics(Configuration conf) {
+ String sessionId = conf.get("session.id");
+ // Initiate Java VM metrics
+ JvmMetrics.init("NameNode", sessionId);
+ // Create a record for NameNode metrics
+ MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+ metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
+ metricsRecord.setTag("sessionId", sessionId);
+ metricsContext.registerUpdater(this);
+ }
+
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ metricsRecord.incrMetric("files_created", numFilesCreated);
+ metricsRecord.incrMetric("files_opened", numFilesOpened);
+ metricsRecord.incrMetric("files_renamed", numFilesRenamed);
+ metricsRecord.incrMetric("files_listed", numFilesListed);
+ metricsRecord.incrMetric("num_transactions", numTransactions);
+ metricsRecord.incrMetric("avg_time_transactions_memory",
+ getAverageTimeTransaction());
+ metricsRecord.incrMetric("num_syncs", numSyncs);
+ metricsRecord.incrMetric("avg_time_transactions_sync",
+ getAverageTimeSync());
+
+ numFilesCreated = 0;
+ numFilesOpened = 0;
+ numFilesRenamed = 0;
+ numFilesListed = 0;
+ numTransactions = 0;
+ totalTimeTransactionsLogMemory = 0;
+ numSyncs = 0;
+ totalTimeSyncs = 0;
+ }
+ metricsRecord.update();
+ }
+
+ synchronized void createFile() {
+ ++numFilesCreated;
+ }
+
+ synchronized void openFile() {
+ ++numFilesOpened;
+ }
+
+ synchronized void renameFile() {
+ ++numFilesRenamed;
+ }
+
+ synchronized void listFile(int nfiles) {
+ numFilesListed += nfiles;
+ }
+
+ synchronized void incrNumTransactions(int count, int time) {
+ numTransactions += count;
+ totalTimeTransactionsLogMemory += time;
+ }
+
+ synchronized void incrSyncs(int count, int time) {
+ numSyncs += count;
+ totalTimeSyncs += time;
+ }
+
+ synchronized private int getAverageTimeTransaction() {
+ if (numTransactions == 0) {
+ return 0;
+ }
+ return totalTimeTransactionsLogMemory/numTransactions;
+ }
+
+ synchronized private int getAverageTimeSync() {
+ if (numSyncs == 0) {
+ return 0;
+ }
+ return totalTimeSyncs/numSyncs;
+ }
+}
Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNodeMetrics.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=583323&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Tue Oct 9 16:48:46 2007
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * This class tests the creation and validation of a checkpoint.
+ */
+public class TestEditLog extends TestCase {
+ static final int numDatanodes = 1;
+
+ // This test creates numThreads threads and each thread does
+ // numberTransactions Transactions concurrently.
+ int numberTransactions = 1000;
+ int numThreads = 100;
+
+ //
+ // an object that does a bunch of transactions
+ //
+ static class Transactions implements Runnable {
+ FSEditLog editLog;
+ int numTransactions;
+ short replication = 3;
+ long blockSize = 64;
+
+ Transactions(FSEditLog editlog, int num) {
+ editLog = editlog;
+ numTransactions = num;
+ }
+
+ // add a bunch of transactions.
+ public void run() {
+ for (int i = 0; i < numTransactions; i++) {
+ INodeFile inode = new INodeFile(0, replication, 0, blockSize);
+ editLog.logCreateFile("/filename" + i, inode);
+ editLog.logSync();
+ }
+ }
+ }
+
+ /**
+ * Tests transaction logging in dfs.
+ */
+ public void testEditLog() throws IOException {
+
+ // start a cluster
+
+ Collection<File> namedirs = null;
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes,
+ true, true, null, null);
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+ int numdirs = 0;
+
+ try {
+ namedirs = cluster.getNameDirs();
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ }
+
+ for (Iterator it = namedirs.iterator(); it.hasNext(); ) {
+ File dir = (File)it.next();
+ System.out.println(dir);
+ numdirs++;
+ }
+
+ FSImage fsimage = new FSImage(namedirs);
+ FSEditLog editLog = fsimage.getEditLog();
+
+ // set small size of flush buffer
+ editLog.setBufferCapacity(2048);
+ editLog.close();
+ editLog.open();
+
+ // Create threads and make them run transactions concurrently.
+ Thread threadId[] = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ Transactions trans = new Transactions(editLog, numberTransactions);
+ threadId[i] = new Thread(trans, "TransactionThread-" + i);
+ threadId[i].start();
+ }
+
+ // wait for all transactions to get over
+ for (int i = 0; i < numThreads; i++) {
+ try {
+ threadId[i].join();
+ } catch (InterruptedException e) {
+ i--; // retry
+ }
+ }
+
+ editLog.close();
+
+ // Verify that we can read in all the transactions that we have written.
+ // If there were any corruptions, it is likely that the reading in
+ // of these transactions will throw an exception.
+ //
+ for (int i = 0; i < numdirs; i++) {
+ File editFile = fsimage.getEditFile(i);
+ System.out.println("Verifying file: " + editFile);
+ int numEdits = editLog.loadFSEdits(editFile);
+ assertTrue("Verification for " + editFile + " failed. " +
+ "Expected " + (numThreads * numberTransactions) + " transactions. "+
+ "Found " + numEdits + " transactions.",
+ numEdits == numThreads * numberTransactions);
+
+ }
+ }
+}
Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL