You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:42:52 UTC

[10/35] ACCUMULO-2041 extract tablet classes to new files, move tablet-related code to o.a.a.tserver.tablet, make member variables private

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java
new file mode 100644
index 0000000..827b803
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public class TabletClosedException extends RuntimeException {
+  public TabletClosedException(Exception e) {
+    super(e);
+  }
+
+  public TabletClosedException() {
+    super();
+  }
+
+  private static final long serialVersionUID = 1L;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
new file mode 100644
index 0000000..bd87a5b
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.log.DfsLogger;
+
+public interface TabletCommitter {
+
+  void abortCommit(CommitSession commitSession, List<Mutation> value);
+
+  void commit(CommitSession commitSession, List<Mutation> mutations);
+
+  boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish);
+
+  void finishUpdatingLogsUsed();
+
+  TableConfiguration getTableConfiguration();
+
+  KeyExtent getExtent();
+
+  int getLogId();
+
+  boolean getUseWAL();
+
+  void updateMemoryUsageStats(long estimatedSizeInBytes, long estimatedSizeInBytes2);
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
new file mode 100644
index 0000000..155d6b5
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.log4j.Logger;
+
+class TabletMemory implements Closeable {
+  static private final Logger log = Logger.getLogger(TabletMemory.class);
+  
+  private final TabletCommitter tablet;
+  private InMemoryMap memTable;
+  private InMemoryMap otherMemTable;
+  private InMemoryMap deletingMemTable;
+  private int nextSeq = 1;
+  private CommitSession commitSession;
+
+  TabletMemory(TabletCommitter tablet) {
+    this.tablet = tablet;
+    try {
+      memTable = new InMemoryMap(tablet.getTableConfiguration());
+    } catch (LocalityGroupConfigurationError e) {
+      throw new RuntimeException(e);
+    }
+    commitSession = new CommitSession(tablet, nextSeq, memTable);
+    nextSeq += 2;
+  }
+
+  public InMemoryMap getMemTable() {
+    return memTable;
+  }
+
+  public InMemoryMap getMinCMemTable() {
+    return otherMemTable;
+  }
+
+  public CommitSession prepareForMinC() {
+    if (otherMemTable != null) {
+      throw new IllegalStateException();
+    }
+
+    if (deletingMemTable != null) {
+      throw new IllegalStateException();
+    }
+    if (commitSession == null) {
+      throw new IllegalStateException();
+    }
+
+    otherMemTable = memTable;
+    try {
+      memTable = new InMemoryMap(tablet.getTableConfiguration());
+    } catch (LocalityGroupConfigurationError e) {
+      throw new RuntimeException(e);
+    }
+
+    CommitSession oldCommitSession = commitSession;
+    commitSession = new CommitSession(tablet, nextSeq, memTable);
+    nextSeq += 2;
+
+    tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes());
+
+    return oldCommitSession;
+  }
+
+  public void finishedMinC() {
+
+    if (otherMemTable == null) {
+      throw new IllegalStateException();
+    }
+
+    if (deletingMemTable != null) {
+      throw new IllegalStateException();
+    }
+    
+    if (commitSession == null) {
+      throw new IllegalStateException();
+    }
+
+    deletingMemTable = otherMemTable;
+
+    otherMemTable = null;
+    tablet.notifyAll();
+  }
+
+  public void finalizeMinC() {
+    if (commitSession == null) {
+      throw new IllegalStateException();
+    }
+    try {
+      deletingMemTable.delete(15000);
+    } finally {
+      synchronized (tablet) {
+        if (otherMemTable != null) {
+          throw new IllegalStateException();
+        }
+
+        if (deletingMemTable == null) {
+          throw new IllegalStateException();
+        }
+
+        deletingMemTable = null;
+
+        tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
+      }
+    }
+  }
+
+  public boolean memoryReservedForMinC() {
+    return otherMemTable != null || deletingMemTable != null;
+  }
+
+  public void waitForMinC() {
+    while (otherMemTable != null || deletingMemTable != null) {
+      try {
+        tablet.wait(50);
+      } catch (InterruptedException e) {
+        log.warn(e, e);
+      }
+    }
+  }
+
+  public void mutate(CommitSession cm, List<Mutation> mutations) {
+    cm.mutate(mutations);
+  }
+
+  public void updateMemoryUsageStats() {
+    long other = 0;
+    if (otherMemTable != null)
+      other = otherMemTable.estimatedSizeInBytes();
+    else if (deletingMemTable != null)
+      other = deletingMemTable.estimatedSizeInBytes();
+
+    tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
+  }
+
+  public List<MemoryIterator> getIterators() {
+    List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
+    toReturn.add(memTable.skvIterator());
+    if (otherMemTable != null)
+      toReturn.add(otherMemTable.skvIterator());
+    return toReturn;
+  }
+
+  public void returnIterators(List<MemoryIterator> iters) {
+    for (MemoryIterator iter : iters) {
+      iter.close();
+    }
+  }
+
+  public long getNumEntries() {
+    if (otherMemTable != null)
+      return memTable.getNumEntries() + otherMemTable.getNumEntries();
+    return memTable.getNumEntries();
+  }
+
+  public CommitSession getCommitSession() {
+    return commitSession;
+  }
+
+  @Override
+  public void close() throws IOException {
+    commitSession = null;
+  }
+
+  public boolean isClosed() {
+    return commitSession == null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
index c5c3316..253c97e 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.accumulo.tserver.Compactor.CountingIterator;
+import org.apache.accumulo.tserver.tablet.Compactor.CountingIterator;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
index f216e93..7cfe65c 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.tablet.RootFiles;
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Rule;