You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/09/11 19:30:12 UTC
svn commit: r1383487 - in
/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs:
DNNFileSystem.java server/namenode/DistributedNamenodeProxy.java
server/namenode/ZookeeperNameNode.java
Author: ecn
Date: Tue Sep 11 17:30:11 2012
New Revision: 1383487
URL: http://svn.apache.org/viewvc?rev=1383487&view=rev
Log:
ACCUMULO-722: clean-up existing code; now runs continuous ingest at scale with agitation, and will shutdown cleanly
Modified:
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java?rev=1383487&r1=1383486&r2=1383487&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java (original)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/DNNFileSystem.java Tue Sep 11 17:30:11 2012
@@ -61,6 +61,7 @@ public class DNNFileSystem extends FileS
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
+ log.info("URI: " + uri);
setConf(conf);
FakeNameNode fake = null;
try {
@@ -588,5 +589,4 @@ public class DNNFileSystem extends FileS
dfs.setBalancerBandwidth(bandwidth);
}
-
}
Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java?rev=1383487&r1=1383486&r2=1383487&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java (original)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/DistributedNamenodeProxy.java Tue Sep 11 17:30:11 2012
@@ -34,7 +34,6 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -70,8 +69,9 @@ import org.apache.accumulo.core.data.Ran
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -101,34 +101,24 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.log4j.Logger;
-
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.zookeeper.WatchedEvent;
import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.api.CuratorWatcher;
public class DistributedNamenodeProxy implements FakeNameNode {
Executor executor = Executors.newSingleThreadExecutor();
public static class ConnectInfo {
- public ConnectInfo(URI uri) {
- String userInfo = uri.getUserInfo();
- log.info("userInfo " + userInfo);
- for (String part : userInfo.split(";")) {
- String parts[] = part.split("=");
- String attr = parts[0];
- String value = parts[1];
- if (attr.equals("user")) {
- this.username = value;
- } else if (attr.equals("pass")) {
- this.passwd = value.getBytes();
- } else if (attr.equals("keepers")) {
- this.zookeepers = value;
- } else if (attr.equals("instance")) {
- this.instance = value;
- } else {
- throw new RuntimeException("unknown entry " + attr + " in authority information");
- }
- }
+ public ConnectInfo(Configuration conf) {
+ this.passwd = conf.get("dnn.user.password", "").getBytes();
+ if (passwd.length == 0)
+ throw new IllegalArgumentException("dnn.user.password not set");
+ this.username = conf.get("dnn.user.username", "root");
+ this.zookeepers = conf.get("dnn.zookeepers", "localhost");
+ this.instance = conf.get("dnn.instance.name", "");
+ if (instance.length() == 0)
+ throw new IllegalArgumentException("dnn.instance.name not set");
}
public String username;
public byte[] passwd;
@@ -145,15 +135,31 @@ public class DistributedNamenodeProxy im
Replicator() {
targets = new HashSet<DatanodeInfo>();
}
+
+ void start() {
+ zookeeper.getData().usingWatcher(new CuratorWatcher() {
+ @Override
+ public void process(WatchedEvent event) throws Exception {
+ scanDatanodes();
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }});
+ }
DatanodeInfo[] getReplicationTargets(int replicationFactor) throws IOException {
-
- // TODO: periodically scan the datanodes table to find new datanodes
- while (targets.size() == 0) {
- scanDatanodes();
- if (targets.size() > 0)
- break;
- UtilWaitThread.sleep(250);
+
+ synchronized (this) {
+ if (targets.size() == 0) {
+ scanDatanodes();
+ }
+ while (targets.size() == 0) {
+ try {
+ wait(250);
+ } catch (InterruptedException e) {
+ //
+ }
+ }
}
List<DatanodeInfo> targetsCopy = new ArrayList<DatanodeInfo>();
@@ -179,8 +185,8 @@ public class DistributedNamenodeProxy im
private void scanDatanodes() throws IOException {
log.info("scanning datanodes table ..");
HashSet<DatanodeInfo> updatedTargets = new HashSet<DatanodeInfo>();
- BatchScanner scanner = createBatchScanner(datanodesTable, new Range());
- infoIpcPort.fetch(scanner);
+ BatchScanner scanner = createBatchScanner(TABLES.DATANODES, new Range());
+ COLUMNS.IPC_PORT.fetch(scanner);
try {
for (Entry<Key,Value> entry : scanner) {
String nodeName = entry.getKey().getRow().toString();
@@ -214,6 +220,16 @@ public class DistributedNamenodeProxy im
}
}
+ static private Scanner createScanner(Connector conn, String table, Range range) throws IOException {
+ try {
+ Scanner result = conn.createScanner(table, Constants.NO_AUTHS);
+ result.setRange(range);
+ return result;
+ } catch (TableNotFoundException ex) {
+ throw new IOException(ex);
+ }
+ }
+
private static BatchWriter createBatchWriter(Connector conn, String table) throws IOException {
try {
return conn.createBatchWriter(table, 10*1000, 1000, 4);
@@ -254,27 +270,31 @@ public class DistributedNamenodeProxy im
private Random rand = new Random();
private Replicator replicator = new Replicator();
private final Connector conn;
- private final static String namespaceTable = "namespace";
- private final static String blocksTable = "blocks";
- private final static String datanodesTable = "datanodes";
- private final static Text infoFam = new Text("info");
- private final static Text childrenFam = new Text("children");
- private final static Text blocksFam = new Text("blocks");
- private final static Text datanodesFam = new Text("datanodes");
- private final static Text commandFam = new Text("command");
-
- private final static ColumnFQ remaining = new ColumnFQ(infoFam, new Text("remaining"));
- private final static ColumnFQ infoSize = new ColumnFQ(infoFam, new Text("size"));
- private final static ColumnFQ isDir = new ColumnFQ(infoFam, new Text("isDir"));
- private final static ColumnFQ infoCapacity = new ColumnFQ(infoFam, new Text("capacity"));
- private final static ColumnFQ infoIpcPort = new ColumnFQ(infoFam, new Text("ipc_port"));
- private final static ColumnFQ infoUsed = new ColumnFQ(infoFam, new Text("used"));
- private final static ColumnFQ infoReplication = new ColumnFQ(infoFam, new Text("replication"));
- private final static ColumnFQ infoBlockSize = new ColumnFQ(infoFam, new Text("blocksize"));
- private final static ColumnFQ infoModificationTime = new ColumnFQ(infoFam, new Text("create_time"));
- private final static ColumnFQ infoStorageID = new ColumnFQ(infoFam, new Text("storageID"));
- private final static ColumnFQ infoPermission = new ColumnFQ(infoFam, new Text("permission"));
-
+ static final class TABLES {
+ private final static String NAMESPACE = "namespace";
+ private final static String BLOCKS = "blocks";
+ private final static String DATANODES = "datanodes";
+ }
+ static final class FAMILIES {
+ private final static Text INFO = new Text("info");
+ private final static Text CHILDREN = new Text("children");
+ private final static Text BLOCKS = new Text("blocks");
+ private final static Text DATANODES = new Text("datanodes");
+ private final static Text COMMAND = new Text("command");
+ }
+ static final class COLUMNS {
+ private final static ColumnFQ REMAINING = new ColumnFQ(FAMILIES.INFO, new Text("remaining"));
+ private final static ColumnFQ SIZE = new ColumnFQ(FAMILIES.INFO, new Text("size"));
+ private final static ColumnFQ IS_DIR = new ColumnFQ(FAMILIES.INFO, new Text("isDir"));
+ private final static ColumnFQ CAPACITY = new ColumnFQ(FAMILIES.INFO, new Text("capacity"));
+ private final static ColumnFQ IPC_PORT = new ColumnFQ(FAMILIES.INFO, new Text("ipc_port"));
+ private final static ColumnFQ USED = new ColumnFQ(FAMILIES.INFO, new Text("used"));
+ private final static ColumnFQ REPLICATION = new ColumnFQ(FAMILIES.INFO, new Text("replication"));
+ private final static ColumnFQ BLOCK_SIZE = new ColumnFQ(FAMILIES.INFO, new Text("blocksize"));
+ private final static ColumnFQ MODIFICATION_TIME = new ColumnFQ(FAMILIES.INFO, new Text("mtime"));
+ private final static ColumnFQ STORAGE_ID = new ColumnFQ(FAMILIES.INFO, new Text("storageID"));
+ private final static ColumnFQ PERMISSION = new ColumnFQ(FAMILIES.INFO, new Text("permission"));
+ }
private final static Value blank = new Value(new byte[]{});
private final static int QUERY_THREADS = 10;
@@ -289,16 +309,17 @@ public class DistributedNamenodeProxy im
private final CuratorFramework zookeeper;
- public DistributedNamenodeProxy(CuratorFramework keeper, URI uri) throws IOException {
+ public DistributedNamenodeProxy(CuratorFramework keeper, Configuration conf) throws IOException {
log.info("========= Distributed Name Node Proxy init =========");
- ConnectInfo info = new ConnectInfo(uri);
+ ConnectInfo info = new ConnectInfo(conf);
Instance instance = new ZooKeeperInstance(info.instance, info.zookeepers);
zookeeper = keeper;
try {
this.conn = instance.getConnector(info.username, info.passwd);
} catch (Exception e) {
throw new IOException(e);
- }
+ }
+ replicator.start();
// String healthNodeHost = config.get("healthnode");
// if(healthNodeHost == null)
// throw new IOException("error: no healthnode address specified. add one to core-site.xml");
@@ -319,8 +340,8 @@ public class DistributedNamenodeProxy im
log.info("using abandonBlock");
// find the block's position in the list (probably the last one)
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- bs.fetchColumnFamily(blocksFam);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ bs.fetchColumnFamily(FAMILIES.BLOCKS);
// delete it from the file
Mutation m = new Mutation(new Text(src));
@@ -330,7 +351,7 @@ public class DistributedNamenodeProxy im
String parts[] = cq.split("_");
long block = Long.parseLong(parts[1]);
if (b.getBlockId() == block) {
- m.putDelete(blocksFam, entry.getKey().getColumnQualifier());
+ m.putDelete(FAMILIES.BLOCKS, entry.getKey().getColumnQualifier());
}
}
} finally {
@@ -341,7 +362,7 @@ public class DistributedNamenodeProxy im
}
// delete the block size and location information
- BatchWriter bw = createBatchWriter(namespaceTable);
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
try {
bw.addMutation(m);
} catch (MutationsRejectedException e) {
@@ -353,10 +374,10 @@ public class DistributedNamenodeProxy im
throw new RuntimeException(e);
}
}
- bw = createBatchWriter(blocksTable);
+ bw = createBatchWriter(TABLES.BLOCKS);
try {
Text row = new Text("" + b.getBlockId());
- bs = createBatchScanner(blocksTable, new Range(row));
+ bs = createBatchScanner(TABLES.BLOCKS, new Range(row));
m = new Mutation(row);
for (Entry<Key,Value> entry : bs) {
m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
@@ -394,9 +415,9 @@ public class DistributedNamenodeProxy im
log.info("using addBlock " + src + " " + clientName);
// get the last block ID and replication
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- bs.fetchColumnFamily(blocksFam);
- infoReplication.fetch(bs);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ bs.fetchColumnFamily(FAMILIES.BLOCKS);
+ COLUMNS.REPLICATION.fetch(bs);
// TODO: fetch from configuration
int defaultReplication = 3;
@@ -404,9 +425,9 @@ public class DistributedNamenodeProxy im
int blockPos = 0;
try {
for (Entry<Key,Value> entry : bs) {
- if (entry.getKey().getColumnFamily().equals(blocksFam))
+ if (entry.getKey().getColumnFamily().equals(FAMILIES.BLOCKS))
blockPos++;
- if (infoReplication.hasColumns(entry.getKey()))
+ if (COLUMNS.REPLICATION.hasColumns(entry.getKey()))
replication = Integer.parseInt(entry.getValue().toString());
}
} finally {
@@ -437,8 +458,8 @@ public class DistributedNamenodeProxy im
// record file to block mapping
Mutation nameData = new Mutation(new Text(src.getBytes()));
- nameData.put(blocksFam, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank);
- BatchWriter bw = createBatchWriter(namespaceTable);
+ nameData.put(FAMILIES.BLOCKS, new Text(String.format("%08d_%d", blockPos, blockID).getBytes()), blank);
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
try {
try {
bw.addMutation(nameData);
@@ -456,8 +477,7 @@ public class DistributedNamenodeProxy im
@Override
public LocatedBlock append(String src, String clientName)
throws IOException {
- log.info("using append");
- return null;
+ throw new NotImplementedException();
}
/** ------------ Data Node Protocol Methods -----------
@@ -476,13 +496,13 @@ public class DistributedNamenodeProxy im
// update blocks table
try {
- final BatchWriter bw = createBatchWriter(blocksTable);
+ final BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
try {
for(Block b : blocks) {
if (ZookeeperNameNode.isZooBlockId(b.getBlockId()))
continue;
Mutation blockData = new Mutation(new Text(Long.toString(b.getBlockId())));
- infoBlockSize.put(blockData, new Value(Long.toString(b.getNumBytes()).getBytes()));
+ COLUMNS.BLOCK_SIZE.put(blockData, new Value(Long.toString(b.getNumBytes()).getBytes()));
bw.addMutation(blockData);
}
} finally {
@@ -510,17 +530,17 @@ public class DistributedNamenodeProxy im
log.info(registration.getName() + " reports blocks " + current);
if (current.isEmpty())
return null;
- BatchWriter bw = createBatchWriter(blocksTable);
+ BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
Mutation m = new Mutation(registration.getName());
- Scanner scan = createScanner(datanodesTable);
+ Scanner scan = createScanner(TABLES.DATANODES);
scan.setRange(new Range(registration.getName()));
- scan.fetchColumnFamily(blocksFam);
+ scan.fetchColumnFamily(FAMILIES.BLOCKS);
try {
for (Entry<Key,Value> entry : scan) {
long block = Long.parseLong(entry.getKey().getColumnQualifier().toString());
if (!current.remove(block)) {
// found some block, not in the blocklist, remove the entry
- m.putDelete(blocksFam, entry.getKey().getColumnQualifier());
+ m.putDelete(FAMILIES.BLOCKS, entry.getKey().getColumnQualifier());
}
}
if (!m.getUpdates().isEmpty())
@@ -566,8 +586,8 @@ public class DistributedNamenodeProxy im
// write complete status to namespace?
// does this just help avoid mutations to existent complete files?
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- bs.fetchColumnFamily(blocksFam);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ bs.fetchColumnFamily(FAMILIES.BLOCKS);
List<Range> ranges = new ArrayList<Range>();
try {
for (Entry<Key,Value> entry : bs) {
@@ -580,8 +600,8 @@ public class DistributedNamenodeProxy im
if (ranges.isEmpty())
return true;
long fileSize = 0;
- BatchScanner blockScanner = createBatchScanner(blocksTable, ranges.toArray(new Range[]{}));
- infoBlockSize.fetch(blockScanner);
+ BatchScanner blockScanner = createBatchScanner(TABLES.BLOCKS, ranges.toArray(new Range[]{}));
+ COLUMNS.BLOCK_SIZE.fetch(blockScanner);
fileSize = 0;
int count = 0;
try {
@@ -603,8 +623,8 @@ public class DistributedNamenodeProxy im
// write size to namespace table
Mutation fileSizePut = new Mutation(new Text(src.getBytes()));
- infoSize.put(fileSizePut, new Value(Long.toString(fileSize).getBytes()));
- BatchWriter bw = createBatchWriter(namespaceTable);
+ COLUMNS.SIZE.put(fileSizePut, new Value(Long.toString(fileSize).getBytes()));
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
try {
try {
bw.addMutation(fileSizePut);
@@ -638,14 +658,14 @@ public class DistributedNamenodeProxy im
// verify that parent directories exist
byte[] parent = getParentPath(src);
String isDirFlag = null;
- ColumnFQ srcColumn = new ColumnFQ(childrenFam, new Text(src));
+ ColumnFQ srcColumn = new ColumnFQ(FAMILIES.CHILDREN, new Text(src));
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(parent)));
- isDir.fetch(bs);
- bs.fetchColumnFamily(childrenFam);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(parent)));
+ COLUMNS.IS_DIR.fetch(bs);
+ bs.fetchColumnFamily(FAMILIES.CHILDREN);
try {
for (Entry<Key,Value> entry : bs) {
- if (isDir.hasColumns(entry.getKey()))
+ if (COLUMNS.IS_DIR.hasColumns(entry.getKey()))
{
isDirFlag = new String(entry.getValue().get());
}
@@ -673,29 +693,28 @@ public class DistributedNamenodeProxy im
/*
* not yet recorded:
*
- * long length
- * long modification_time
* long access_time
* String owner
* String group
+ * Leases
*/
// TODO: not atomic
try {
- BatchWriter bw = createBatchWriter(namespaceTable);
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
try {
Mutation createRequest = new Mutation(new Text(src));
- infoModificationTime.put(createRequest, now());
- put(createRequest, infoReplication, Short.toString(replication));
- put(createRequest, infoBlockSize, Long.toString(blockSize));
- put(createRequest, infoPermission, masked.toString());
- put(createRequest, isDir, "N");
+ COLUMNS.MODIFICATION_TIME.put(createRequest, now());
+ put(createRequest, COLUMNS.REPLICATION, Short.toString(replication));
+ put(createRequest, COLUMNS.BLOCK_SIZE, Long.toString(blockSize));
+ put(createRequest, COLUMNS.PERMISSION, masked.toString());
+ put(createRequest, COLUMNS.IS_DIR, "N");
bw.addMutation(createRequest);
// record existence of new file in parent dir now or on complete?
Mutation childCreate = new Mutation(new Text(getParentPath(src)));
// TODO: could store that this is a file in the Value
- childCreate.put(childrenFam, new Text(src.getBytes()), blank);
+ childCreate.put(FAMILIES.CHILDREN, new Text(src.getBytes()), blank);
bw.addMutation(childCreate);
} finally {
bw.close();
@@ -748,17 +767,17 @@ public class DistributedNamenodeProxy im
byte[] parent = getParentPath(src);
// determine whether this is a directory
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- isDir.fetch(bs);
- bs.fetchColumnFamily(childrenFam);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ COLUMNS.IS_DIR.fetch(bs);
+ bs.fetchColumnFamily(FAMILIES.CHILDREN);
String isDir_ = null;
ArrayList<Text> children = new ArrayList<Text>();
try {
for (Entry<Key,Value> entry : bs) {
- if (isDir.hasColumns(entry.getKey())) {
+ if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
isDir_ = entry.getKey().getColumnQualifier().toString();
- } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+ } else if (entry.getKey().getColumnFamily().equals(FAMILIES.CHILDREN)) {
children.add(entry.getKey().getColumnQualifier());
}
}
@@ -770,15 +789,15 @@ public class DistributedNamenodeProxy im
Mutation childDelete = new Mutation(new Text(parent));
Text srcText = new Text(src);
- childDelete.putDelete(childrenFam, srcText);
- infoModificationTime.put(childDelete, now());
+ childDelete.putDelete(FAMILIES.CHILDREN, srcText);
+ COLUMNS.MODIFICATION_TIME.put(childDelete, now());
ArrayList<Mutation> deletes = new ArrayList<Mutation>();
getDeletes(srcText, deletes);
deletes.add(childDelete);
// delete everything at once
- BatchWriter nw = createBatchWriter(namespaceTable);
+ BatchWriter nw = createBatchWriter(TABLES.NAMESPACE);
Set<Text> blocks = new HashSet<Text>();
try {
try {
@@ -786,7 +805,7 @@ public class DistributedNamenodeProxy im
for (Mutation m : deletes) {
for (ColumnUpdate update : m.getUpdates()) {
byte cf[] = update.getColumnFamily();
- if (blocksFam.compareTo(cf, 0, cf.length) == 0) {
+ if (FAMILIES.BLOCKS.compareTo(cf, 0, cf.length) == 0) {
blocks.add(new Text(new String(update.getColumnQualifier()).split("_", 2)[1]));
}
}
@@ -799,20 +818,20 @@ public class DistributedNamenodeProxy im
log.info("deleting blocks "+ blocks);
Map<String, List<String>> hostBlockMap = new HashMap<String, List<String>>();
// Now remove the blocks
- BatchWriter bw = createBatchWriter(blocksTable);
+ BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
// scan back the entries that go with the blocks
List<Range> ranges = new ArrayList<Range>();
for (Text row : blocks) {
ranges.add(new Range(row));
}
- bs = createBatchScanner(blocksTable, ranges.toArray(new Range[0]));
+ bs = createBatchScanner(TABLES.BLOCKS, ranges.toArray(new Range[0]));
// delete everything that matches our block list
for (Entry<Key,Value> entry : bs) {
if (blocks.contains(entry.getKey().getRow())) {
Mutation m = new Mutation(entry.getKey().getRow());
m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
bw.addMutation(m);
- if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
+ if (entry.getKey().getColumnFamily().equals(FAMILIES.DATANODES)) {
String host = entry.getKey().getColumnQualifier().toString();
String block = entry.getKey().getRow().toString();
List<String> blockList = null;
@@ -828,7 +847,7 @@ public class DistributedNamenodeProxy im
log.info("Host -> block map " + hostBlockMap);
// Create commands to remove the blocks on the datanodes at the next heartbeat
- bw = createBatchWriter(datanodesTable);
+ bw = createBatchWriter(TABLES.DATANODES);
for (Entry<String,List<String>> entry : hostBlockMap.entrySet()) {
String host = entry.getKey();
Block block[] = new Block[entry.getValue().size()];
@@ -838,7 +857,7 @@ public class DistributedNamenodeProxy im
}
Mutation m = new Mutation(host);
DatanodeCommand cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, block);
- m.put(commandFam, new Text(UUID.randomUUID().toString()), new Value(serialize(cmd)));
+ m.put(FAMILIES.COMMAND, new Text(UUID.randomUUID().toString()), new Value(serialize(cmd)));
bw.addMutation(m);
}
bw.close();
@@ -887,15 +906,15 @@ public class DistributedNamenodeProxy im
// get blocks from namespace table
Value fileSizeBytes = null;
java.util.Map<Text, Value> IDs = new TreeMap<Text, Value>();
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- bs.fetchColumnFamily(blocksFam);
- bs.fetchColumnFamily(infoFam);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ bs.fetchColumnFamily(FAMILIES.BLOCKS);
+ bs.fetchColumnFamily(FAMILIES.INFO);
try {
log.info("getting blocks for " + src + " from namespace table");
for (Entry<Key,Value> entry : bs) {
- if (infoSize.hasColumns(entry.getKey())) {
+ if (COLUMNS.SIZE.hasColumns(entry.getKey())) {
fileSizeBytes = entry.getValue();
- } else if (entry.getKey().getColumnFamily().equals(blocksFam)) {
+ } else if (entry.getKey().getColumnFamily().equals(FAMILIES.BLOCKS)) {
IDs.put(entry.getKey().getColumnQualifier(), entry.getValue());
}
}
@@ -932,19 +951,19 @@ public class DistributedNamenodeProxy im
log.info("getting host data for block ...");
long blockSize = 0;
ArrayList<DatanodeInfo> dni = new ArrayList<DatanodeInfo>();
- bs = createBatchScanner(blocksTable, new Range(idString));
- bs.fetchColumnFamily(datanodesFam);
- infoBlockSize.fetch(bs);
+ bs = createBatchScanner(TABLES.BLOCKS, new Range(idString));
+ bs.fetchColumnFamily(FAMILIES.DATANODES);
+ COLUMNS.BLOCK_SIZE.fetch(bs);
try {
for (Entry<Key,Value> entry : bs) {
- if (infoBlockSize.hasColumns(entry.getKey())) {
+ if (COLUMNS.BLOCK_SIZE.hasColumns(entry.getKey())) {
blockSize = Long.parseLong(new String(entry.getValue().get()));
fileLength += blockSize;
if (blockSize == 0) {
underConst = true;
}
log.info("got size " + blockSize + " for block " + blockIDString);
- } else if (entry.getKey().getColumnFamily().equals(datanodesFam)) {
+ } else if (entry.getKey().getColumnFamily().equals(FAMILIES.DATANODES)) {
String host = entry.getKey().getColumnQualifier().toString();
dni.add(new DatanodeInfo(new DatanodeID(host)));
log.info("got host: " + new String(host) + " for block " + blockIDString);
@@ -983,7 +1002,28 @@ public class DistributedNamenodeProxy im
@Override
public ContentSummary getContentSummary(String path) throws IOException {
log.info("using getContentSummary");
- return null;
+ if (!path.endsWith("/"))
+ path += "/";
+ long summary[] = {0, 0, 1};
+ Text endRange = new Text(path);
+ endRange.append(new byte[]{(byte)0xff}, 0, 1);
+ Range range = new Range(new Text(path), true, endRange, false);
+ Scanner scanner = createScanner(conn, TABLES.NAMESPACE, range);
+ COLUMNS.SIZE.fetch(scanner);
+ COLUMNS.IS_DIR.fetch(scanner);
+ for (Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ if (COLUMNS.SIZE.hasColumns(key)) {
+ summary [0] += Long.parseLong(entry.getValue().toString());
+ }
+ if (COLUMNS.IS_DIR.hasColumns(key)) {
+ if ("Y".equals(entry.getValue().toString()))
+ summary[2]++;
+ else
+ summary[1]++;
+ }
+ }
+ return new ContentSummary(summary[0], summary[1], summary[2]);
}
@Override
@@ -1010,12 +1050,12 @@ public class DistributedNamenodeProxy im
private void getDeletes(Text src, List<Mutation> deletes) throws IOException {
// Maybe this list won't fit in memory?
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
Mutation m = new Mutation(src);
try {
for (Entry<Key,Value> entry : bs) {
Text columnFamily = entry.getKey().getColumnFamily();
- if (columnFamily.equals(childrenFam)) {
+ if (columnFamily.equals(FAMILIES.CHILDREN)) {
getDeletes(entry.getKey().getColumnQualifier(), deletes);
}
log.info("deleting " + src + " " + entry.getKey().getColumnFamily() + ":" + entry.getKey().getColumnQualifier());
@@ -1032,7 +1072,7 @@ public class DistributedNamenodeProxy im
@Override
public HdfsFileStatus getFileInfo(String src) throws IOException {
log.info("using getFileInfo " + src);
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(src));
try {
return loadFileStatus(src, bs.iterator());
} finally {
@@ -1051,17 +1091,17 @@ public class DistributedNamenodeProxy im
ArrayList<HdfsFileStatus> files = new ArrayList<HdfsFileStatus>();
String isDirFlag = null;
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- bs.fetchColumnFamily(childrenFam);
- isDir.fetch(bs);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ bs.fetchColumnFamily(FAMILIES.CHILDREN);
+ COLUMNS.IS_DIR.fetch(bs);
List<String> children = new ArrayList<String>();
try {
for (Entry<Key,Value> entry : bs) {
log.info("Looking at entry " + entry.getKey() + " -> " + entry.getValue());
String value = new String(entry.getValue().get());
- if (isDir.hasColumns(entry.getKey())) {
+ if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
isDirFlag = value;
- } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+ } else if (entry.getKey().getColumnFamily().equals(FAMILIES.CHILDREN)) {
children.add(entry.getKey().getColumnQualifier().toString());
}
}
@@ -1076,7 +1116,7 @@ public class DistributedNamenodeProxy im
log.info("Looking at children " + children);
for (String child : children) {
- bs = createBatchScanner(namespaceTable, new Range(child));
+ bs = createBatchScanner(TABLES.NAMESPACE, new Range(child));
try {
HdfsFileStatus stat = loadFileStatus(child, bs.iterator());
files.add(stat);
@@ -1084,7 +1124,6 @@ public class DistributedNamenodeProxy im
bs.close();
}
}
- log.info("files " + files);
return new DirectoryListing(files.toArray(new HdfsFileStatus[files.size()]), 0);
}
@@ -1121,17 +1160,17 @@ public class DistributedNamenodeProxy im
Entry<Key, Value> entry = fileResult.next();
log.info("looking at file data " + entry.getKey() + " -> " + entry.getValue());
String value = new String(entry.getValue().get());
- if (isDir.hasColumns(entry.getKey())) {
+ if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
isDirFlag = value;
- } else if (infoSize.hasColumns(entry.getKey())) {
+ } else if (COLUMNS.SIZE.hasColumns(entry.getKey())) {
length = Long.parseLong(value);
- } else if (infoReplication.hasColumns(entry.getKey())) {
+ } else if (COLUMNS.REPLICATION.hasColumns(entry.getKey())) {
block_replication = Integer.parseInt(value);
- } else if (infoBlockSize.hasColumns(entry.getKey())) {
+ } else if (COLUMNS.BLOCK_SIZE.hasColumns(entry.getKey())) {
blocksize = Long.parseLong(value);
- } else if (infoModificationTime.hasColumns(entry.getKey())) {
+ } else if (COLUMNS.MODIFICATION_TIME.hasColumns(entry.getKey())) {
modification_time = Long.parseLong(value);
- } else if (infoPermission.hasColumns(entry.getKey())) {
+ } else if (COLUMNS.PERMISSION.hasColumns(entry.getKey())) {
permissionString = new String(entry.getValue().get());
}
row = entry.getKey().getRow();
@@ -1143,6 +1182,7 @@ public class DistributedNamenodeProxy im
boolean isdir = isDirFlag.equals("Y");
FsPermission permission = FsPermission.getDefault();
+ log.info("permission string " + permissionString);
if (permissionString != null) {
permission = FsPermission.valueOf((isdir ? "d":"-") + permissionString);
}
@@ -1172,11 +1212,11 @@ public class DistributedNamenodeProxy im
// TODO: check permissions
src = normalizePath(src);
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(new Text(src)));
- isDir.fetch(bs);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(src)));
+ COLUMNS.IS_DIR.fetch(bs);
try {
for (Entry<Key,Value> entry : bs) {
- if (isDir.hasColumns(entry.getKey())) {
+ if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
if (new String(entry.getValue().get()).equals("Y"))
return true;
else
@@ -1191,15 +1231,15 @@ public class DistributedNamenodeProxy im
// verify parent path exists
byte[] parentPath = getParentPath(src);
- bs = createBatchScanner(namespaceTable, new Range(new Text(parentPath)));
- isDir.fetch(bs);
- bs.fetchColumnFamily(childrenFam);
+ bs = createBatchScanner(TABLES.NAMESPACE, new Range(new Text(parentPath)));
+ COLUMNS.IS_DIR.fetch(bs);
+ bs.fetchColumnFamily(FAMILIES.CHILDREN);
String isDirString = null;
try {
for (Entry<Key,Value> entry : bs) {
- if (isDir.hasColumns(entry.getKey())) {
+ if (COLUMNS.IS_DIR.hasColumns(entry.getKey())) {
isDirString = new String(entry.getValue().get());
- } else if (entry.getKey().getColumnFamily().equals(childrenFam)) {
+ } else if (entry.getKey().getColumnFamily().equals(FAMILIES.CHILDREN)) {
}
}
} finally {
@@ -1212,16 +1252,16 @@ public class DistributedNamenodeProxy im
throw new IOException("error: parent " + src + " is not a directory");
// edit namespace
- BatchWriter bw = createBatchWriter(namespaceTable);
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
Mutation m = new Mutation(new Text(parentPath));
- m.put(childrenFam, new Text(src), blank);
+ m.put(FAMILIES.CHILDREN, new Text(src), blank);
//String dirName = getDirName(src);
try {
try {
bw.addMutation(m);
m = new Mutation(new Text(src));
- isDir.put(m, new Value("Y".getBytes()));
- infoModificationTime.put(m, new Value(Long.toString(System.currentTimeMillis()).getBytes()));
+ COLUMNS.IS_DIR.put(m, new Value("Y".getBytes()));
+ COLUMNS.MODIFICATION_TIME.put(m, now());
bw.addMutation(m);
} finally {
bw.close();
@@ -1254,19 +1294,19 @@ public class DistributedNamenodeProxy im
Mutation blockData = new Mutation(new Text(blockIDBytes));
for(int i=0; i < hosts.length; i++)
- blockData.put(datanodesFam, new Text(hosts[i].name.getBytes()), blank);
- BatchWriter bw = createBatchWriter(blocksTable);
+ blockData.put(FAMILIES.DATANODES, new Text(hosts[i].name.getBytes()), blank);
+ BatchWriter bw = createBatchWriter(TABLES.BLOCKS);
try {
bw.addMutation(blockData);
} finally {
bw.close();
}
- bw = createBatchWriter(datanodesTable);
+ bw = createBatchWriter(TABLES.DATANODES);
try {
for(int i=0; i < hosts.length; i++) {
Mutation host = new Mutation(new Text(hosts[i].name));
- host.put(blocksFam, new Text(blockIDBytes), blank);
+ host.put(FAMILIES.BLOCKS, new Text(blockIDBytes), blank);
bw.addMutation(host);
}
} finally {
@@ -1297,9 +1337,9 @@ public class DistributedNamenodeProxy im
// record this datanode's info
try {
if (conn != null) {
- BatchWriter bw = createBatchWriter(datanodesTable);
+ BatchWriter bw = createBatchWriter(TABLES.DATANODES);
Mutation reg = new Mutation(new Text(registration.name.getBytes()));
- infoStorageID.put(reg, new Value(registration.storageID.getBytes()));
+ COLUMNS.STORAGE_ID.put(reg, new Value(registration.storageID.getBytes()));
try {
try {
bw.addMutation(reg);
@@ -1328,9 +1368,9 @@ public class DistributedNamenodeProxy im
FileStatus getFileStatus(String src) throws IOException {
FileStatus result = new FileStatus(false, false);
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(src));
try {
- isDir.fetch(bs);
+ COLUMNS.IS_DIR.fetch(bs);
for (Entry<Key,Value> entry : bs) {
result.exists = true;
if (new String(entry.getValue().get()).equals("Y")) {
@@ -1369,8 +1409,8 @@ public class DistributedNamenodeProxy im
if (dstStatus.exists)
delete(dst, true);
// copy file information
- BatchScanner bs = createBatchScanner(namespaceTable, new Range(src));
- BatchWriter bw = createBatchWriter(namespaceTable);
+ BatchScanner bs = createBatchScanner(TABLES.NAMESPACE, new Range(src));
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
try {
Mutation m = new Mutation(new Text(dst));
Mutation s = new Mutation(new Text(src));
@@ -1383,11 +1423,11 @@ public class DistributedNamenodeProxy im
bw.addMutation(s);
// remove child link in src's parent
m = new Mutation(new Text(getParentPath(src)));
- m.putDelete(childrenFam, new Text(src));
+ m.putDelete(FAMILIES.CHILDREN, new Text(src));
bw.addMutation(m);
// add child link in dst's parent
m = new Mutation(new Text(getParentPath(dst)));
- m.put(childrenFam, new Text(dst), blank);
+ m.put(FAMILIES.CHILDREN, new Text(dst), blank);
} finally {
bs.close();
bw.close();
@@ -1440,7 +1480,7 @@ public class DistributedNamenodeProxy im
SendResult result = new SendResult();
log.info("using sendHeartbeat");
- if (!conn.tableOperations().exists(datanodesTable))
+ if (!conn.tableOperations().exists(TABLES.DATANODES))
return result;
// update datanodes table with info
// skip this if none of the numbers have changed
@@ -1449,12 +1489,12 @@ public class DistributedNamenodeProxy im
dfsUsed != lastDfsUsed ||
remaining != lastRemaining) {
try {
- BatchWriter bw = createBatchWriter(conn, datanodesTable);
+ BatchWriter bw = createBatchWriter(conn, TABLES.DATANODES);
Mutation m = new Mutation(new Text(registration.name.getBytes()));
- infoCapacity.put(m, new Value(Long.toString(capacity).getBytes()));
- infoUsed.put(m, new Value(Long.toString(dfsUsed).getBytes()));
- infoIpcPort.put(m, new Value(Integer.toString(registration.getIpcPort()).getBytes()));
- DistributedNamenodeProxy.remaining.put(m, new Value(Long.toString(remaining).getBytes()));
+ COLUMNS.CAPACITY.put(m, new Value(Long.toString(capacity).getBytes()));
+ COLUMNS.USED.put(m, new Value(Long.toString(dfsUsed).getBytes()));
+ COLUMNS.IPC_PORT.put(m, new Value(Integer.toString(registration.getIpcPort()).getBytes()));
+ COLUMNS.REMAINING.put(m, new Value(Long.toString(remaining).getBytes()));
try {
bw.addMutation(m);
} finally {
@@ -1470,8 +1510,8 @@ public class DistributedNamenodeProxy im
// return a list of commands for the data node
List<DatanodeCommand> commands = new ArrayList<DatanodeCommand>();
try {
- BatchScanner bs = createBatchScanner(datanodesTable, new Range(registration.getName()));
- bs.fetchColumnFamily(commandFam);
+ BatchScanner bs = createBatchScanner(TABLES.DATANODES, new Range(registration.getName()));
+ bs.fetchColumnFamily(FAMILIES.COMMAND);
for (Entry<Key,Value> entry : bs) {
Key key = entry.getKey();
DatanodeCommand command = (DatanodeCommand)deserialize(entry.getValue().get());
@@ -1500,8 +1540,10 @@ public class DistributedNamenodeProxy im
}
try {
if (future.isDone()) {
+ // Ok, we were able to do a heartbeat, so go ahead and delete the commands we're about to return
+ // TODO: this could fail, too.
SendResult result = future.get();
- BatchWriter bw = createBatchWriter(datanodesTable);
+ BatchWriter bw = createBatchWriter(TABLES.DATANODES);
bw.addMutations(result.deletes);
bw.close();
return result.commands.toArray(new DatanodeCommand[0]);
@@ -1555,10 +1597,10 @@ public class DistributedNamenodeProxy im
try {
HdfsFileStatus fileInfo = getFileInfo(src);
if (!fileInfo.getPermission().equals(permission)) {
- BatchWriter bw = createBatchWriter(namespaceTable);
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
try {
Mutation m = new Mutation(src);
- infoPermission.put(m, new Value(permission.toString().getBytes()));
+ COLUMNS.PERMISSION.put(m, new Value(permission.toString().getBytes()));
bw.addMutation(m);
} finally {
bw.close();
@@ -1578,8 +1620,28 @@ public class DistributedNamenodeProxy im
@Override
public boolean setReplication(String src, short replication)
throws IOException {
- unimplemented(src, replication);
- return false;
+ log.info("using setReplicatoon");
+ try {
+ HdfsFileStatus fileInfo = getFileInfo(src);
+ if (fileInfo.isDir())
+ return false;
+
+ if (fileInfo.getReplication() != replication) {
+ BatchWriter bw = createBatchWriter(TABLES.NAMESPACE);
+ try {
+ Mutation m = new Mutation(src);
+ COLUMNS.REPLICATION.put(m, new Value(Short.toString(replication).getBytes()));
+ bw.addMutation(m);
+ } finally {
+ bw.close();
+ }
+ }
+ return true;
+ } catch (FileNotFoundException ex) {
+ return false;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
}
@Override
Modified: accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java?rev=1383487&r1=1383486&r2=1383487&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java (original)
+++ accumulo/branches/ACCUMULO-722/distnn/src/main/java/org/apache/hadoop/hdfs/server/namenode/ZookeeperNameNode.java Tue Sep 11 17:30:11 2012
@@ -62,11 +62,17 @@ import com.netflix.curator.framework.Cur
import com.netflix.curator.framework.CuratorFrameworkFactory.Builder;
import com.netflix.curator.retry.RetryUntilElapsed;
+// Provide a limited NameNode interface that stores data into zookeeper;
+// Lazily create and redirect requests to non-metadata files to the Accumulo-based NameNode.
public class ZookeeperNameNode implements FakeNameNode {
static private Logger log = Logger.getLogger(ZookeeperNameNode.class);
- CuratorFramework keeper;
- Random random = new Random();
+ private final Configuration conf;
+ private final CuratorFramework keeper;
+ private final Random random = new Random();
+ private final String instance;
+
+ private DistributedNamenodeProxy dist = null;
public static class FileInfo implements Serializable {
private static final long serialVersionUID = 1L;
@@ -85,6 +91,7 @@ public class ZookeeperNameNode implement
public long size;
}
+ // An object to store at a zookeeper node that represents a directory's metadata
public static class DirInfo implements Serializable {
private static final long serialVersionUID = 1L;
@@ -93,8 +100,10 @@ public class ZookeeperNameNode implement
}
public long createTime;
+ public String permission = "rwxrwxrwx";
}
+ // An object to store at a zookeeeper node that represents block metadata
public static class BlockInfo implements Serializable {
private static final long serialVersionUID = 1L;
@@ -110,23 +119,20 @@ public class ZookeeperNameNode implement
boolean complete;
}
- static Pattern isRoot = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/wal(/.*|$)|/recovery.*|/tables$|/tables/(\\!0|0|1|2)(/.*|$))");
+ // Metadata for these files are stored in zookeeper
+ static Pattern metaDataFileNames = Pattern.compile("/accumulo(|/instance_id.*|/version.*|/walogArchive|/wal(/.*|$)|/recovery.*|/tables$|/tables/(\\!0|\\!1|\\!2|\\!3)(/.*|$))");
static private boolean isZooName(String path) {
- boolean result = isRoot.matcher(path).matches();
+ boolean result = metaDataFileNames.matcher(path).matches();
log.info("Looking at " + path + " isZooName " + result);
return result;
}
+
+ // By convention, blockIds tracked in zookeeper are negative
static public boolean isZooBlockId(long blockId) {
return blockId < 0;
}
- private final URI uri;
- private DistributedNamenodeProxy dist = null;
- private final String instance;
-
- private long start = System.currentTimeMillis();
-
private static URI getURI(Configuration conf) throws IOException {
try {
return new URI(conf.get("fs.default.name"));
@@ -140,16 +146,18 @@ public class ZookeeperNameNode implement
}
public ZookeeperNameNode(Configuration conf, URI uri) throws IOException {
- ConnectInfo info = new ConnectInfo(uri);
+ this.conf = conf;
+ ConnectInfo info = new ConnectInfo(conf);
instance = info.instance;
Builder builder = CuratorFrameworkFactory.builder();
builder.connectString(info.zookeepers);
+ // TODO: configure timeout to zookeeper
+ // TODO: get constant configuration
builder.retryPolicy(new RetryUntilElapsed(120*1000, 500));
//builder.aclProvider(aclProvider);
CuratorFramework client = builder.build();
client.start();
this.keeper = client;
- this.uri = uri;
try {
findDatanodes();
} catch (Exception e) {
@@ -157,12 +165,13 @@ public class ZookeeperNameNode implement
}
}
- private static void unimplemented(Object ... args) {
+ private static void notImplementedWarning(Object ... args) {
Throwable t = new Throwable();
String method = t.getStackTrace()[1].getMethodName();
log.warn(method + " unimplemented, args: " + Arrays.asList(args), t);
}
+ // Create the truly distributed namenode connection, hopefully enough datanodes and tservers are running
private FakeNameNode dist() {
try {
if (dist == null) {
@@ -179,7 +188,7 @@ public class ZookeeperNameNode implement
}
}
if (atLeastOneTserver)
- dist = new DistributedNamenodeProxy(keeper, uri);
+ dist = new DistributedNamenodeProxy(keeper, conf);
}
} catch (Exception ex) {
log.warn(ex, ex);
@@ -276,7 +285,8 @@ public class ZookeeperNameNode implement
byte[] current = keeper.getData().forPath(path);
log.info("Current value for " + src + " is " + new Text(current));
if (overwrite) {
- keeper.setData().forPath(path, data);
+ delete(src, true);
+ create(src, masked, clientName, overwrite, createParent, replication, blockSize);
} else {
throw new FileAlreadyExistsException(src);
}
@@ -304,24 +314,65 @@ public class ZookeeperNameNode implement
@Override
public boolean recoverLease(String src, String clientName) throws IOException {
- unimplemented(src, clientName);
+ notImplementedWarning(src, clientName);
return true;
}
@Override
public boolean setReplication(String src, short replication) throws IOException {
- unimplemented(src, replication);
- return true;
+ if (!isZooName(src))
+ return dist().setReplication(src, replication);
+ try {
+ String path = DNNConstants.NAMESPACE_PATH + src;
+ byte[] data = keeper.getData().forPath(path);
+ Object obj = deserialize(data);
+ if (obj == null) {
+ obj = new DirInfo(System.currentTimeMillis());
+ }
+ if (obj instanceof FileInfo) {
+ FileInfo info = (FileInfo)obj;
+ info.replication = replication;
+ keeper.setData().forPath(path, serialize(obj));
+ return true;
+ }
+ return false;
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
}
@Override
public void setPermission(String src, FsPermission permission) throws IOException {
- unimplemented(src, permission);
+ if (!isZooName(src)) {
+ dist().setPermission(src, permission);
+ return;
+ }
+ try {
+ String path = DNNConstants.NAMESPACE_PATH + src;
+ byte[] data = keeper.getData().forPath(path);
+ Object obj = deserialize(data);
+ if (obj == null) {
+ obj = new DirInfo(System.currentTimeMillis());
+ }
+ if (obj instanceof FileInfo) {
+ FileInfo info = (FileInfo)obj;
+ info.permission = permission.toString();
+ obj = info;
+ }
+ if (obj instanceof DirInfo) {
+ DirInfo info = (DirInfo)obj;
+ info.permission = permission.toString();
+ obj = info;
+ }
+ keeper.setData().forPath(path, serialize(obj));
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ }
}
@Override
public void setOwner(String src, String username, String groupname) throws IOException {
- unimplemented(src, username, groupname);
+ notImplementedWarning(src, username, groupname);
}
@Override
@@ -373,7 +424,7 @@ public class ZookeeperNameNode implement
throw new IOException(e);
}
- short defaultReplication = 3; // TODO: read config
+ short defaultReplication = (short)conf.getInt("dfs.replication", 3);
short replication = -1;
try {
String path = DNNConstants.NAMESPACE_PATH + src;
@@ -391,6 +442,8 @@ public class ZookeeperNameNode implement
// DistibutedNameNode holds the positive blocks
long blockID = -Math.abs(random.nextLong());
+ // probably never happen
+ if (blockID == 0) blockID = -1;
Block b = new Block(blockID, 0, 0);
List<String> replicas = randomList.subList(0, Math.min(replication, randomList.size()));
List<DatanodeInfo> targets = new ArrayList<DatanodeInfo>();
@@ -472,7 +525,7 @@ public class ZookeeperNameNode implement
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
- unimplemented((Object[])blocks);
+ notImplementedWarning((Object[])blocks);
}
private Object getInfo(String path) throws Exception {
@@ -566,7 +619,6 @@ public class ZookeeperNameNode implement
} catch (KeeperException.NoNodeException ex) {
return;
}
- log.info("children of " + path + " is " + children);
Object obj = deserialize(keeper.getData().forPath(path));
if (removeBlocks && obj instanceof FileInfo) {
// create the datanode command to (eventually) delete the blocks
@@ -691,57 +743,57 @@ public class ZookeeperNameNode implement
@Override
public long[] getStats() throws IOException {
- unimplemented();
+ notImplementedWarning();
return null;
}
@Override
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) throws IOException {
- unimplemented(type);
+ notImplementedWarning(type);
return null;
}
@Override
public long getPreferredBlockSize(String filename) throws IOException {
- unimplemented(filename);
+ notImplementedWarning(filename);
return 0;
}
@Override
public boolean setSafeMode(SafeModeAction action) throws IOException {
- unimplemented(action);
+ notImplementedWarning(action);
return false;
}
@Override
public void saveNamespace() throws IOException {
- unimplemented();
+ notImplementedWarning();
}
@Override
public void refreshNodes() throws IOException {
- unimplemented();
+ notImplementedWarning();
}
@Override
public void finalizeUpgrade() throws IOException {
- unimplemented();
+ notImplementedWarning();
}
@Override
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException {
- unimplemented();
+ notImplementedWarning();
return null;
}
@Override
public void metaSave(String filename) throws IOException {
- unimplemented(filename);
+ notImplementedWarning(filename);
}
@Override
public void setBalancerBandwidth(long bandwidth) throws IOException {
- unimplemented(bandwidth);
+ notImplementedWarning(bandwidth);
}
@Override
@@ -767,47 +819,79 @@ public class ZookeeperNameNode implement
return null;
}
+ static private class Summary {
+ long length = 0;
+ long fileCount = 0;
+ long directoryCount = 0;
+ }
+
@Override
public ContentSummary getContentSummary(String path) throws IOException {
- unimplemented(path);
- return null;
+ Summary result = new Summary();
+ DirectoryListing listing = this.getListing(path, null);
+ for (HdfsFileStatus child : listing.getPartialListing()) {
+ if (isZooName(child.getFullName(path))) {
+ zooRecurse(child.getFullName(path), result);
+ } else {
+ ContentSummary dnnSummary = dist.getContentSummary(path);
+ result.length += dnnSummary.getLength();
+ result.fileCount += dnnSummary.getFileCount();
+ result.directoryCount += dnnSummary.getDirectoryCount();
+ }
+ }
+ return new ContentSummary(result.length, result.fileCount, result.directoryCount);
+ }
+
+ private void zooRecurse(String src, Summary summary) throws IOException {
+ HdfsFileStatus result = getFileInfo(src);
+ if (result.isDir()) {
+ summary.directoryCount++;
+ for (HdfsFileStatus child : getListing(src, null).getPartialListing()) {
+ zooRecurse(child.getFullName(src), summary);
+ }
+ return;
+ }
+ summary.fileCount++;
+ summary.length += result.getLen();
}
@Override
public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {
- unimplemented(path, namespaceQuota, diskspaceQuota);
+ notImplementedWarning(path, namespaceQuota, diskspaceQuota);
}
@Override
public void fsync(String src, String client) throws IOException {
- unimplemented(src, client);
+ FakeNameNode dist = dist();
+ if (dist != null)
+ dist.fsync(src, client);
}
@Override
public void setTimes(String src, long mtime, long atime) throws IOException {
- unimplemented(src, mtime, atime);
+ notImplementedWarning(src, mtime, atime);
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException {
- unimplemented(renewer);
+ notImplementedWarning(renewer);
return null;
}
@Override
public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
- unimplemented(token);
+ notImplementedWarning(token);
return 0;
}
@Override
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException {
- unimplemented(token);
+ notImplementedWarning(token);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
- unimplemented(protocol, clientVersion);
+ notImplementedWarning(protocol, clientVersion);
return 0;
}
@@ -815,6 +899,7 @@ public class ZookeeperNameNode implement
public DatanodeRegistration register(DatanodeRegistration registration) throws IOException {
log.info("register " + registration);
if (keeper != null) {
+ // TODO: don't need to *always* update zookeeper, right?
log.info("registering in zookeeper as " + registration.name);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DataOutputStream data = new DataOutputStream(stream);
@@ -894,7 +979,7 @@ public class ZookeeperNameNode implement
@Override
public void blocksBeingWrittenReport(DatanodeRegistration registration, long[] blocks) throws IOException {
- unimplemented(registration, new BlockListAsLongs(blocks));
+ notImplementedWarning(registration, new BlockListAsLongs(blocks));
}
@Override
@@ -925,7 +1010,7 @@ public class ZookeeperNameNode implement
@Override
public void errorReport(DatanodeRegistration registration, int errorCode, String msg) throws IOException {
- unimplemented(registration, errorCode, msg);
+ notImplementedWarning(registration, errorCode, msg);
}
@Override
@@ -934,26 +1019,25 @@ public class ZookeeperNameNode implement
// TODO: find out how to get namespace id
// could store this in the info of the / entry
NamespaceInfo nsi = new NamespaceInfo(384837986, 0, 0);
- //throw new RuntimeException();
return nsi;
}
@Override
public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
- unimplemented(comm);
+ notImplementedWarning(comm);
return null;
}
@Override
public long nextGenerationStamp(Block block, boolean fromNN) throws IOException {
- unimplemented(block, fromNN);
+ notImplementedWarning(block, fromNN);
return 0;
}
@Override
public void commitBlockSynchronization(Block block, long newgenerationstamp, long newlength, boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
throws IOException {
- unimplemented(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
+ notImplementedWarning(block, newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
}
}