You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2011/11/30 18:23:37 UTC

svn commit: r1208531 - in /incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server: fate/Fate.java fate/Repo.java master/Master.java master/tableOps/TraceRepo.java tabletserver/TabletServer.java

Author: ecn
Date: Wed Nov 30 17:23:35 2011
New Revision: 1208531

URL: http://svn.apache.org/viewvc?rev=1208531&view=rev
Log:
ACCUMULO-195
 * reduce dependency on Tracing in FATE
 * wrap FATE operations in a tracing Repo
 * save trace information in the tracing Repo
 * use getDescription() for debug information
 * make tserver hostname information consistent with the other servers
 

Added:
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java   (with props)
Modified:
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Fate.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Repo.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Fate.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Fate.java?rev=1208531&r1=1208530&r2=1208531&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Fate.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Fate.java Wed Nov 30 17:23:35 2011
@@ -23,9 +23,6 @@ import org.apache.accumulo.core.util.Log
 import org.apache.accumulo.server.fate.TStore.TStatus;
 import org.apache.log4j.Logger;
 
-import cloudtrace.instrument.Span;
-import cloudtrace.instrument.Trace;
-import cloudtrace.instrument.TraceRunnable;
 
 /**
  * Fault tolerant executor
@@ -54,12 +51,9 @@ public class Fate<T> {
       while (true) {
         long deferTime = 0;
         long tid = store.reserve();
-        Span span = null;
         try {
           TStatus status = store.getStatus(tid);
           Repo<T> op = store.top(tid);
-          span = Trace.on(op.getDescription());
-          span.data("tid", Long.toHexString(tid));
           if (status == TStatus.FAILED_IN_PROGRESS) {
             processFailed(tid, op);
           } else {
@@ -97,8 +91,6 @@ public class Fate<T> {
           }
         } finally {
           store.unreserve(tid, deferTime);
-          if (span != null)
-            span.stop();
         }
         
       }
@@ -150,7 +142,7 @@ public class Fate<T> {
     
     for (int i = 0; i < numTreads; i++) {
       // TODO: use a ExecutorService, maybe a utility to do these steps throughout the server packages
-      Thread thread = new Daemon(new TraceRunnable(new LoggingRunnable(log, new TransactionRunner())), "Repo runner " + i);
+      Thread thread = new Daemon(new LoggingRunnable(log, new TransactionRunner()), "Repo runner " + i);
       thread.start();
     }
   }
@@ -179,7 +171,7 @@ public class Fate<T> {
         if (autoCleanUp)
           store.setProperty(tid, AUTO_CLEAN_PROP, new Boolean(autoCleanUp));
         
-        store.setProperty(tid, DEBUG_PROP, repo.getClass().getName());
+        store.setProperty(tid, DEBUG_PROP, repo.getDescription());
         
         store.setStatus(tid, TStatus.IN_PROGRESS);
       }

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Repo.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Repo.java?rev=1208531&r1=1208530&r2=1208531&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Repo.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/fate/Repo.java Wed Nov 30 17:23:35 2011
@@ -33,4 +33,5 @@ public interface Repo<T> extends Seriali
   
   // this allows the last fate op to return something to the user
   String getReturn();
+  
 }
\ No newline at end of file

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1208531&r1=1208530&r2=1208531&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Nov 30 17:23:35 2011
@@ -131,6 +131,7 @@ import org.apache.accumulo.server.master
 import org.apache.accumulo.server.master.tableOps.DeleteTable;
 import org.apache.accumulo.server.master.tableOps.RenameTable;
 import org.apache.accumulo.server.master.tableOps.TableRangeOp;
+import org.apache.accumulo.server.master.tableOps.TraceRepo;
 import org.apache.accumulo.server.master.tserverOps.ShutdownTServer;
 import org.apache.accumulo.server.monitor.Monitor;
 import org.apache.accumulo.server.security.Authenticator;
@@ -831,7 +832,7 @@ public class Master implements LiveTServ
       }
       
       long tid = fate.startTransaction();
-      fate.seedTransaction(tid, new ShutdownTServer(doomed, force), false);
+      fate.seedTransaction(tid, new TraceRepo<Master>(new ShutdownTServer(doomed, force)), false);
       fate.waitForCompletion(tid);
       fate.delete(tid);
     }
@@ -938,7 +939,7 @@ public class Master implements LiveTServ
           
           org.apache.accumulo.core.client.admin.TimeType timeType = org.apache.accumulo.core.client.admin.TimeType.valueOf(ByteBufferUtil.toString(arguments
               .get(1)));
-          fate.seedTransaction(opid, new CreateTable(c.user, tableName, timeType, options), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new CreateTable(c.user, tableName, timeType, options)), autoCleanup);
           
           break;
         }
@@ -952,7 +953,7 @@ public class Master implements LiveTServ
           checkTableName(newTableName, TableOperation.RENAME);
           verify(c, tableId, TableOperation.RENAME, check(c, tableId, TablePermission.ALTER_TABLE) || check(c, SystemPermission.ALTER_TABLE));
           
-          fate.seedTransaction(opid, new RenameTable(tableId, oldTableName, newTableName), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new RenameTable(tableId, oldTableName, newTableName)), autoCleanup);
           
           break;
         }
@@ -981,7 +982,7 @@ public class Master implements LiveTServ
             propertiesToSet.put(entry.getKey(), entry.getValue());
           }
           
-          fate.seedTransaction(opid, new CloneTable(c.user, srcTableId, tableName, propertiesToSet, propertiesToExclude), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new CloneTable(c.user, srcTableId, tableName, propertiesToSet, propertiesToExclude)), autoCleanup);
           
           break;
         }
@@ -991,7 +992,7 @@ public class Master implements LiveTServ
           checkNotMetadataTable(tableName, TableOperation.DELETE);
           verify(c, tableId, TableOperation.DELETE, check(c, SystemPermission.DROP_TABLE) || check(c, tableId, TablePermission.DROP_TABLE));
           
-          fate.seedTransaction(opid, new DeleteTable(tableId), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new DeleteTable(tableId)), autoCleanup);
           break;
         }
         case ONLINE: {
@@ -1001,7 +1002,7 @@ public class Master implements LiveTServ
           verify(c, tableId, TableOperation.ONLINE,
               check(c, SystemPermission.SYSTEM) || check(c, SystemPermission.ALTER_TABLE) || check(c, tableId, TablePermission.ALTER_TABLE));
           
-          fate.seedTransaction(opid, new ChangeTableState(tableId, TableOperation.ONLINE), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new ChangeTableState(tableId, TableOperation.ONLINE)), autoCleanup);
           break;
         }
         case OFFLINE: {
@@ -1011,7 +1012,7 @@ public class Master implements LiveTServ
           verify(c, tableId, TableOperation.OFFLINE,
               check(c, SystemPermission.SYSTEM) || check(c, SystemPermission.ALTER_TABLE) || check(c, tableId, TablePermission.ALTER_TABLE));
           
-          fate.seedTransaction(opid, new ChangeTableState(tableId, TableOperation.OFFLINE), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new ChangeTableState(tableId, TableOperation.OFFLINE)), autoCleanup);
           break;
         }
         case MERGE: {
@@ -1024,7 +1025,7 @@ public class Master implements LiveTServ
           verify(c, tableId, TableOperation.MERGE,
               check(c, SystemPermission.SYSTEM) || check(c, SystemPermission.ALTER_TABLE) || check(c, tableId, TablePermission.ALTER_TABLE));
           
-          fate.seedTransaction(opid, new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow)), autoCleanup);
           break;
         }
         case DELETE_RANGE: {
@@ -1036,7 +1037,7 @@ public class Master implements LiveTServ
           checkNotMetadataTable(tableName, TableOperation.DELETE_RANGE);
           verify(c, tableId, TableOperation.DELETE_RANGE, check(c, SystemPermission.SYSTEM) || check(c, tableId, TablePermission.WRITE));
           
-          fate.seedTransaction(opid, new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow)), autoCleanup);
           break;
         }
         case BULK_IMPORT: {
@@ -1049,7 +1050,7 @@ public class Master implements LiveTServ
           checkNotMetadataTable(tableName, TableOperation.BULK_IMPORT);
           verify(c, tableId, TableOperation.BULK_IMPORT, check(c, tableId, TablePermission.BULK_IMPORT));
           
-          fate.seedTransaction(opid, new BulkImport(tableId, dir, failDir, setTime), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new BulkImport(tableId, dir, failDir, setTime)), autoCleanup);
           break;
         }
         case COMPACT: {
@@ -1060,7 +1061,7 @@ public class Master implements LiveTServ
           verify(c, tableId, TableOperation.COMPACT,
               check(c, tableId, TablePermission.WRITE) || check(c, tableId, TablePermission.ALTER_TABLE) || check(c, SystemPermission.ALTER_TABLE));
           
-          fate.seedTransaction(opid, new CompactRange(tableId, startRow, endRow), autoCleanup);
+          fate.seedTransaction(opid, new TraceRepo<Master>(new CompactRange(tableId, startRow, endRow)), autoCleanup);
           break;
         }
         default:

Added: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java?rev=1208531&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java (added)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java Wed Nov 30 17:23:35 2011
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.tableOps;
+
+import org.apache.accumulo.server.fate.Repo;
+
+import cloudtrace.instrument.Span;
+import cloudtrace.instrument.Trace;
+import cloudtrace.instrument.Tracer;
+import cloudtrace.thrift.TInfo;
+
+/**
+ * 
+ */
+public class TraceRepo<T> implements Repo<T> {
+  
+  private static final long serialVersionUID = 1L;
+
+  TInfo tinfo;
+  Repo<T> repo;
+  
+  public TraceRepo(Repo<T> repo) {
+    this.repo = repo;
+    tinfo = Tracer.traceInfo();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.server.fate.Repo#isReady(long, java.lang.Object)
+   */
+  @Override
+  public long isReady(long tid, T environment) throws Exception {
+    Span span = Trace.trace(tinfo, repo.getDescription());
+    try {
+      return repo.isReady(tid, environment);
+    } finally {
+      span.stop();
+    }
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.server.fate.Repo#call(long, java.lang.Object)
+   */
+  @Override
+  public Repo<T> call(long tid, T environment) throws Exception {
+    Span span = Trace.trace(tinfo, repo.getDescription());
+    try {
+      Repo<T> result = repo.call(tid, environment);
+      if (result == null)
+        return result;
+      return new TraceRepo<T>(result);
+    } finally {
+      span.stop();
+    }
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.server.fate.Repo#undo(long, java.lang.Object)
+   */
+  @Override
+  public void undo(long tid, T environment) throws Exception {
+    Span span = Trace.trace(tinfo, repo.getDescription());
+    try {
+      repo.undo(tid, environment);
+    } finally {
+      span.stop();
+    }
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.server.fate.Repo#getDescription()
+   */
+  @Override
+  public String getDescription() {
+    return repo.getDescription();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.accumulo.server.fate.Repo#getReturn()
+   */
+  @Override
+  public String getReturn() {
+    return repo.getReturn();
+  }
+
+}

Propchange: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/TraceRepo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1208531&r1=1208530&r2=1208531&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Wed Nov 30 17:23:35 2011
@@ -2922,7 +2922,7 @@ public class TabletServer extends Abstra
       
       if (args.length > 0)
         conf.set("tabletserver.hostname", args[0]);
-      Accumulo.enableTracing(local.getHostAddress(), "tserver");
+      Accumulo.enableTracing(local.getHostName(), "tserver");
     } catch (IOException e) {
       log.fatal("couldn't get a reference to the filesystem. quitting");
       throw new RuntimeException(e);