You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/05 06:42:52 UTC
[10/35] ACCUMULO-2041 extract tablet classes to new files,
move tablet-related code to o.a.a.tserver.tablet,
make member variables private
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java
new file mode 100644
index 0000000..827b803
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletClosedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public class TabletClosedException extends RuntimeException {
+ public TabletClosedException(Exception e) {
+ super(e);
+ }
+
+ public TabletClosedException() {
+ super();
+ }
+
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
new file mode 100644
index 0000000..bd87a5b
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.log.DfsLogger;
+
+public interface TabletCommitter {
+
+ void abortCommit(CommitSession commitSession, List<Mutation> value);
+
+ void commit(CommitSession commitSession, List<Mutation> mutations);
+
+ boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish);
+
+ void finishUpdatingLogsUsed();
+
+ TableConfiguration getTableConfiguration();
+
+ KeyExtent getExtent();
+
+ int getLogId();
+
+ boolean getUseWAL();
+
+ void updateMemoryUsageStats(long estimatedSizeInBytes, long estimatedSizeInBytes2);
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
new file mode 100644
index 0000000..155d6b5
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletMemory.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.log4j.Logger;
+
+class TabletMemory implements Closeable {
+ static private final Logger log = Logger.getLogger(TabletMemory.class);
+
+ private final TabletCommitter tablet;
+ private InMemoryMap memTable;
+ private InMemoryMap otherMemTable;
+ private InMemoryMap deletingMemTable;
+ private int nextSeq = 1;
+ private CommitSession commitSession;
+
+ TabletMemory(TabletCommitter tablet) {
+ this.tablet = tablet;
+ try {
+ memTable = new InMemoryMap(tablet.getTableConfiguration());
+ } catch (LocalityGroupConfigurationError e) {
+ throw new RuntimeException(e);
+ }
+ commitSession = new CommitSession(tablet, nextSeq, memTable);
+ nextSeq += 2;
+ }
+
+ public InMemoryMap getMemTable() {
+ return memTable;
+ }
+
+ public InMemoryMap getMinCMemTable() {
+ return otherMemTable;
+ }
+
+ public CommitSession prepareForMinC() {
+ if (otherMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ if (deletingMemTable != null) {
+ throw new IllegalStateException();
+ }
+ if (commitSession == null) {
+ throw new IllegalStateException();
+ }
+
+ otherMemTable = memTable;
+ try {
+ memTable = new InMemoryMap(tablet.getTableConfiguration());
+ } catch (LocalityGroupConfigurationError e) {
+ throw new RuntimeException(e);
+ }
+
+ CommitSession oldCommitSession = commitSession;
+ commitSession = new CommitSession(tablet, nextSeq, memTable);
+ nextSeq += 2;
+
+ tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes());
+
+ return oldCommitSession;
+ }
+
+ public void finishedMinC() {
+
+ if (otherMemTable == null) {
+ throw new IllegalStateException();
+ }
+
+ if (deletingMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ if (commitSession == null) {
+ throw new IllegalStateException();
+ }
+
+ deletingMemTable = otherMemTable;
+
+ otherMemTable = null;
+ tablet.notifyAll();
+ }
+
+ public void finalizeMinC() {
+ if (commitSession == null) {
+ throw new IllegalStateException();
+ }
+ try {
+ deletingMemTable.delete(15000);
+ } finally {
+ synchronized (tablet) {
+ if (otherMemTable != null) {
+ throw new IllegalStateException();
+ }
+
+ if (deletingMemTable == null) {
+ throw new IllegalStateException();
+ }
+
+ deletingMemTable = null;
+
+ tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
+ }
+ }
+ }
+
+ public boolean memoryReservedForMinC() {
+ return otherMemTable != null || deletingMemTable != null;
+ }
+
+ public void waitForMinC() {
+ while (otherMemTable != null || deletingMemTable != null) {
+ try {
+ tablet.wait(50);
+ } catch (InterruptedException e) {
+ log.warn(e, e);
+ }
+ }
+ }
+
+ public void mutate(CommitSession cm, List<Mutation> mutations) {
+ cm.mutate(mutations);
+ }
+
+ public void updateMemoryUsageStats() {
+ long other = 0;
+ if (otherMemTable != null)
+ other = otherMemTable.estimatedSizeInBytes();
+ else if (deletingMemTable != null)
+ other = deletingMemTable.estimatedSizeInBytes();
+
+ tablet.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
+ }
+
+ public List<MemoryIterator> getIterators() {
+ List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
+ toReturn.add(memTable.skvIterator());
+ if (otherMemTable != null)
+ toReturn.add(otherMemTable.skvIterator());
+ return toReturn;
+ }
+
+ public void returnIterators(List<MemoryIterator> iters) {
+ for (MemoryIterator iter : iters) {
+ iter.close();
+ }
+ }
+
+ public long getNumEntries() {
+ if (otherMemTable != null)
+ return memTable.getNumEntries() + otherMemTable.getNumEntries();
+ return memTable.getNumEntries();
+ }
+
+ public CommitSession getCommitSession() {
+ return commitSession;
+ }
+
+ @Override
+ public void close() throws IOException {
+ commitSession = null;
+ }
+
+ public boolean isClosed() {
+ return commitSession == null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
index c5c3316..253c97e 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/CountingIteratorTest.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedMapIterator;
-import org.apache.accumulo.tserver.Compactor.CountingIterator;
+import org.apache.accumulo.tserver.tablet.Compactor.CountingIterator;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
index f216e93..7cfe65c 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/RootFilesTest.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.tablet.RootFiles;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;