You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/01/23 08:36:36 UTC
[18/23] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Moved the lookup of the ttl on DNS failure caching into fate, since ZooSession needs it and moved there.
Conflicts:
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
server/src/main/java/org/apache/accumulo/server/Accumulo.java
server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
test/src/main/java/org/apache/accumulo/test/randomwalk/security/SecurityHelper.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c4cd3b1b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c4cd3b1b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c4cd3b1b
Branch: refs/heads/1.5.1-SNAPSHOT
Commit: c4cd3b1bce6963afda3e63a8f7044ecb0b2402ce
Parents: 707e74e f778dcf
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Jan 22 23:20:51 2014 -0600
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Wed Jan 22 23:20:51 2014 -0600
----------------------------------------------------------------------
.../accumulo/core/client/ZooKeeperInstance.java | 10 ++-
.../apache/accumulo/core/util/AddressUtil.java | 3 +-
.../apache/accumulo/fate/util/AddressUtil.java | 60 +++++++++++++
.../accumulo/fate/zookeeper/ZooSession.java | 12 +--
.../accumulo/fate/util/AddressUtilTest.java | 95 ++++++++++++++++++++
.../org/apache/accumulo/server/Accumulo.java | 27 +++++-
.../server/master/tableOps/DeleteTable.java | 8 ++
.../accumulo/server/tabletserver/Compactor.java | 2 +
.../accumulo/server/trace/TraceServer.java | 67 ++++++++++----
.../accumulo/server/util/TabletOperations.java | 10 ++-
10 files changed, 265 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
index 18d55a6,0000000..46a7307
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
@@@ -1,312 -1,0 +1,320 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
++import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.security.CredentialHelper;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * An implementation of instance that looks in zookeeper to find information needed to connect to an instance of accumulo.
+ *
+ * <p>
+ * The advantage of using zookeeper to obtain information about accumulo is that zookeeper is highly available, very responsive, and supports caching.
+ *
+ * <p>
+ * Because it is possible for multiple instances of accumulo to share a single set of zookeeper servers, all constructors require an accumulo instance name.
+ *
+ * If you do not know the instance names then run accumulo org.apache.accumulo.server.util.ListInstances on an accumulo server.
+ *
+ */
+
+public class ZooKeeperInstance implements Instance {
+
+ private static final Logger log = Logger.getLogger(ZooKeeperInstance.class);
+
+ private String instanceId = null;
+ private String instanceName = null;
+
+ private final ZooCache zooCache;
+
+ private final String zooKeepers;
+
+ private final int zooKeepersSessionTimeOut;
+
+ /**
+ *
+ * @param instanceName
+ * The name of specific accumulo instance. This is set at initialization time.
+ * @param zooKeepers
+ * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
+ */
+
+ public ZooKeeperInstance(String instanceName, String zooKeepers) {
+ this(instanceName, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ }
+
+ /**
+ *
+ * @param instanceName
+ * The name of specific accumulo instance. This is set at initialization time.
+ * @param zooKeepers
+ * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
+ * @param sessionTimeout
+ * zoo keeper session time out in milliseconds.
+ */
+
+ public ZooKeeperInstance(String instanceName, String zooKeepers, int sessionTimeout) {
+ ArgumentChecker.notNull(instanceName, zooKeepers);
+ this.instanceName = instanceName;
+ this.zooKeepers = zooKeepers;
+ this.zooKeepersSessionTimeOut = sessionTimeout;
+ zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+ getInstanceID();
+ }
+
+ /**
+ *
+ * @param instanceId
+ * The UUID that identifies the accumulo instance you want to connect to.
+ * @param zooKeepers
+ * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
+ */
+
+ public ZooKeeperInstance(UUID instanceId, String zooKeepers) {
+ this(instanceId, zooKeepers, (int) AccumuloConfiguration.getDefaultConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+ }
+
+ /**
+ *
+ * @param instanceId
+ * The UUID that identifies the accumulo instance you want to connect to.
+ * @param zooKeepers
+ * A comma separated list of zoo keeper server locations. Each location can contain an optional port, of the format host:port.
+ * @param sessionTimeout
+ * zoo keeper session time out in milliseconds.
+ */
+
+ public ZooKeeperInstance(UUID instanceId, String zooKeepers, int sessionTimeout) {
+ ArgumentChecker.notNull(instanceId, zooKeepers);
+ this.instanceId = instanceId.toString();
+ this.zooKeepers = zooKeepers;
+ this.zooKeepersSessionTimeOut = sessionTimeout;
+ zooCache = ZooCache.getInstance(zooKeepers, sessionTimeout);
+ }
+
+ @Override
+ public String getInstanceID() {
+ if (instanceId == null) {
+ // want the instance id to be stable for the life of this instance object,
+ // so only get it once
+ String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
+ byte[] iidb = zooCache.get(instanceNamePath);
+ if (iidb == null) {
+ throw new RuntimeException("Instance name " + instanceName
+ + " does not exist in zookeeper. Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to see a list.");
+ }
+ instanceId = new String(iidb, Constants.UTF8);
+ }
+
+ if (zooCache.get(Constants.ZROOT + "/" + instanceId) == null) {
+ if (instanceName == null)
+ throw new RuntimeException("Instance id " + instanceId + " does not exist in zookeeper");
+ throw new RuntimeException("Instance id " + instanceId + " pointed to by the name " + instanceName + " does not exist in zookeeper");
+ }
+
+ return instanceId;
+ }
+
+ @Override
+ public List<String> getMasterLocations() {
+ String masterLocPath = ZooUtil.getRoot(this) + Constants.ZMASTER_LOCK;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up master location in zoocache.");
+ byte[] loc = ZooUtil.getLockData(zooCache, masterLocPath);
+ opTimer.stop("Found master at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return Collections.emptyList();
+ }
+
+ return Collections.singletonList(new String(loc));
+ }
+
+ @Override
+ public String getRootTabletLocation() {
+ String zRootLocPath = ZooUtil.getRoot(this) + Constants.ZROOT_TABLET_LOCATION;
+
+ OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Looking up root tablet location in zookeeper.");
+ byte[] loc = zooCache.get(zRootLocPath);
+ opTimer.stop("Found root tablet at " + (loc == null ? null : new String(loc)) + " in %DURATION%");
+
+ if (loc == null) {
+ return null;
+ }
+
+ return new String(loc).split("\\|")[0];
+ }
+
+ @Override
+ public String getInstanceName() {
+ if (instanceName == null)
+ instanceName = lookupInstanceName(zooCache, UUID.fromString(getInstanceID()));
+
+ return instanceName;
+ }
+
+ @Override
+ public String getZooKeepers() {
+ return zooKeepers;
+ }
+
+ @Override
+ public int getZooKeepersSessionTimeOut() {
+ return zooKeepersSessionTimeOut;
+ }
+
+ @Override
+ @Deprecated
+ public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, TextUtil.getBytes(new Text(pass.toString())));
+ }
+
+ @Override
+ @Deprecated
+ public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(user, ByteBufferUtil.toBytes(pass));
+ }
+
+ @Override
+ public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(CredentialHelper.create(principal, token, getInstanceID()));
+ }
+
+ @SuppressWarnings("deprecation")
+ private Connector getConnector(TCredentials credential) throws AccumuloException, AccumuloSecurityException {
+ return new ConnectorImpl(this, credential);
+ }
+
+ @Override
+ @Deprecated
+ public Connector getConnector(String principal, byte[] pass) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(principal, new PasswordToken(pass));
+ }
+
+ private AccumuloConfiguration conf = null;
+
+ @Override
+ public AccumuloConfiguration getConfiguration() {
+ if (conf == null)
+ conf = AccumuloConfiguration.getDefaultConfiguration();
+ return conf;
+ }
+
+ @Override
+ public void setConfiguration(AccumuloConfiguration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * @deprecated Use {@link #lookupInstanceName(org.apache.accumulo.fate.zookeeper.ZooCache, UUID)} instead
+ */
+ @Deprecated
+ public static String lookupInstanceName(org.apache.accumulo.core.zookeeper.ZooCache zooCache, UUID instanceId) {
+ return lookupInstanceName((ZooCache) zooCache, instanceId);
+ }
+
+ /**
+ * Given a zooCache and instanceId, look up the instance name.
+ *
+ * @param zooCache
+ * @param instanceId
+ * @return the instance name
+ */
+ public static String lookupInstanceName(ZooCache zooCache, UUID instanceId) {
+ ArgumentChecker.notNull(zooCache, instanceId);
+ for (String name : zooCache.getChildren(Constants.ZROOT + Constants.ZINSTANCES)) {
+ String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name;
+ byte[] bytes = zooCache.get(instanceNamePath);
+ UUID iid = UUID.fromString(new String(bytes, Constants.UTF8));
+ if (iid.equals(instanceId)) {
+ return name;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * To be moved to server code. Only lives here to support certain client side utilities to minimize command-line options.
+ */
+ @Deprecated
+ public static String getInstanceIDFromHdfs(Path instanceDirectory) {
+ try {
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration());
+ FileStatus[] files = null;
+ try {
+ files = fs.listStatus(instanceDirectory);
+ } catch (FileNotFoundException ex) {
+ // ignored
+ }
+ log.debug("Trying to read instance id from " + instanceDirectory);
+ if (files == null || files.length == 0) {
+ log.error("unable obtain instance id at " + instanceDirectory);
+ throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory);
+ } else if (files.length != 1) {
+ log.error("multiple potential instances in " + instanceDirectory);
+ throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory);
+ } else {
+ String result = files[0].getPath().getName();
+ return result;
+ }
+ } catch (IOException e) {
- throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e);
++ log.error("Problem reading instance id out of hdfs at " + instanceDirectory, e);
++ throw new RuntimeException("Can't tell if Accumulo is initialized; can't read instance id at " + instanceDirectory, e);
++ } catch (IllegalArgumentException exception) {
++ /* HDFS throws this when there's a UnknownHostException due to DNS troubles. */
++ if (exception.getCause() instanceof UnknownHostException) {
++ log.error("Problem reading instance id out of hdfs at " + instanceDirectory, exception);
++ }
++ throw exception;
+ }
+ }
+
+ @Deprecated
+ @Override
+ public Connector getConnector(org.apache.accumulo.core.security.thrift.AuthInfo auth) throws AccumuloException, AccumuloSecurityException {
+ return getConnector(auth.user, auth.password);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
index 08ae106,0000000..af9a1a6
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
@@@ -1,50 -1,0 +1,51 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.transport.TSocket;
+
- public class AddressUtil {
++public class AddressUtil extends org.apache.accumulo.fate.util.AddressUtil {
++
+ static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException {
+ String[] parts = address.split(":", 2);
+ if (address.contains("+"))
+ parts = address.split("\\+", 2);
+ if (parts.length == 2) {
+ if (parts[1].isEmpty())
+ return new InetSocketAddress(parts[0], defaultPort);
+ return new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
+ }
+ return new InetSocketAddress(address, defaultPort);
+ }
+
+ static public InetSocketAddress parseAddress(Text address, int defaultPort) {
+ return parseAddress(address.toString(), defaultPort);
+ }
+
+ static public TSocket createTSocket(String address, int defaultPort) {
+ InetSocketAddress addr = parseAddress(address, defaultPort);
+ return new TSocket(addr.getHostName(), addr.getPort());
+ }
+
+ static public String toString(InetSocketAddress addr) {
+ return addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
index 0000000,0000000..7a8c269
new file mode 100644
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/util/AddressUtil.java
@@@ -1,0 -1,0 +1,60 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.fate.util;
++
++import java.net.InetAddress; // workaround to enable @see/@link hyperlink
++import java.net.UnknownHostException;
++import java.security.Security;
++
++import org.apache.log4j.Logger;
++
++public class AddressUtil {
++
++ private static final Logger log = Logger.getLogger(AddressUtil.class);
++
++ /**
++ * Fetch the security value that determines how long DNS failures are cached.
++ * Looks up the security property 'networkaddress.cache.negative.ttl'. Should that fail returns
++ * the default value used in the Oracle JVM 1.4+, which is 10 seconds.
++ *
++ * @param originalException the host lookup that is the source of needing this lookup. maybe be null.
++ * @return positive integer number of seconds
++ * @see java.net.InetAddress
++ * @throws IllegalArgumentException if dns failures are cached forever
++ */
++ static public int getAddressCacheNegativeTtl(UnknownHostException originalException) {
++ int negativeTtl = 10;
++ try {
++ negativeTtl = Integer.parseInt(Security.getProperty("networkaddress.cache.negative.ttl"));
++ } catch (NumberFormatException exception) {
++ log.warn("Failed to get JVM negative DNS respones cache TTL due to format problem (e.g. this JVM might not have the " +
++ "property). Falling back to default based on Oracle JVM 1.6 (10s)", exception);
++ } catch (SecurityException exception) {
++ log.warn("Failed to get JVM negative DNS response cache TTL due to security manager. Falling back to default based on Oracle JVM 1.6 (10s)", exception);
++ }
++ if (-1 == negativeTtl) {
++ log.error("JVM negative DNS repsonse cache TTL is set to 'forever' and host lookup failed. TTL can be changed with security property " +
++ "'networkaddress.cache.negative.ttl', see java.net.InetAddress.", originalException);
++ throw new IllegalArgumentException(originalException);
++ } else if (0 > negativeTtl) {
++ log.warn("JVM specified negative DNS response cache TTL was negative (and not 'forever'). Falling back to default based on Oracle JVM 1.6 (10s)");
++ negativeTtl = 10;
++ }
++ return negativeTtl;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
index 040b01d,0000000..13f6d08
mode 100644,000000..100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
@@@ -1,160 -1,0 +1,162 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
-
++import org.apache.accumulo.fate.util.AddressUtil;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public class ZooSession {
+
+ public static class ZooSessionShutdownException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ }
+
+ private static final Logger log = Logger.getLogger(ZooSession.class);
+
+ private static class ZooSessionInfo {
+ public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
+ this.zooKeeper = zooKeeper;
+ }
+
+ ZooKeeper zooKeeper;
+ }
+
+ private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
+
+ private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
+ return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth));
+ }
+
+ private static class ZooWatcher implements Watcher {
+
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.Expired) {
+ log.debug("Session expired, state of current session : " + event.getState());
+ }
+ }
+
+ }
+
+ public static ZooKeeper connect(String host, int timeout, String scheme, byte[] auth, Watcher watcher) {
+ final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+ final int TOTAL_CONNECT_TIME_WAIT_MS = 10 * 1000;
+ boolean tryAgain = true;
+ int sleepTime = 100;
+ ZooKeeper zooKeeper = null;
+
+ long startTime = System.currentTimeMillis();
+
+ while (tryAgain) {
+ try {
+ zooKeeper = new ZooKeeper(host, timeout, watcher);
+ // it may take some time to get connected to zookeeper if some of the servers are down
+ for (int i = 0; i < TOTAL_CONNECT_TIME_WAIT_MS / TIME_BETWEEN_CONNECT_CHECKS_MS && tryAgain; i++) {
+ if (zooKeeper.getState().equals(States.CONNECTED)) {
+ if (auth != null)
+ zooKeeper.addAuthInfo(scheme, auth);
+ tryAgain = false;
+ } else
+ UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+ }
+
+ if (System.currentTimeMillis() - startTime > 2 * timeout)
+ throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout);
+
- } catch (UnknownHostException uhe) {
- // do not expect to recover from this
- log.warn(uhe.getClass().getName() + " : " + uhe.getMessage());
- throw new RuntimeException(uhe);
+ } catch (IOException e) {
++ if (e instanceof UnknownHostException) {
++ /*
++ Make sure we wait atleast as long as the JVM TTL for negative DNS responses
++ */
++ sleepTime = Math.max(sleepTime, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e) + 1) * 1000);
++ }
+ log.warn("Connection to zooKeeper failed, will try again in " + String.format("%.2f secs", sleepTime / 1000.0), e);
+ } finally {
+ if (tryAgain && zooKeeper != null)
+ try {
+ zooKeeper.close();
+ zooKeeper = null;
+ } catch (InterruptedException e) {
+ log.warn("interrupted", e);
+ }
+ }
+
+ if (tryAgain) {
+ UtilWaitThread.sleep(sleepTime);
+ if (sleepTime < 10000)
+ sleepTime = (int) (sleepTime + sleepTime * Math.random());
+ }
+ }
+
+ return zooKeeper;
+ }
+
+ public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
+ return getSession(zooKeepers, timeout, null, null);
+ }
+
+ public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
+
+ if (sessions == null)
+ throw new ZooSessionShutdownException();
+
+ String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
+
+ // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
+ String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
+ ZooSessionInfo zsi = sessions.get(sessionKey);
+ if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+ if (auth != null && sessions.get(readOnlySessionKey) == zsi)
+ sessions.remove(readOnlySessionKey);
+ zsi = null;
+ sessions.remove(sessionKey);
+ }
+
+ if (zsi == null) {
+ ZooWatcher watcher = new ZooWatcher();
+ log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");
+ zsi = new ZooSessionInfo(connect(zooKeepers, timeout, scheme, auth, watcher), watcher);
+ sessions.put(sessionKey, zsi);
+ if (auth != null && !sessions.containsKey(readOnlySessionKey))
+ sessions.put(readOnlySessionKey, zsi);
+ }
+ return zsi.zooKeeper;
+ }
+
+ public static synchronized void shutdown() {
+ for (ZooSessionInfo zsi : sessions.values()) {
+ try {
+ zsi.zooKeeper.close();
+ } catch (Exception e) {
+ log.debug("Error closing zookeeper during shutdown", e);
+ }
+ }
+
+ sessions = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
----------------------------------------------------------------------
diff --cc fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
index 0000000,0000000..aca4571
new file mode 100644
--- /dev/null
+++ b/fate/src/test/java/org/apache/accumulo/fate/util/AddressUtilTest.java
@@@ -1,0 -1,0 +1,95 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.fate.util;
++
++import java.security.Security;
++
++import junit.framework.TestCase;
++
++import org.apache.log4j.Logger;
++
++/**
++ * Test the AddressUtil class.
++ *
++ */
++public class AddressUtilTest extends TestCase {
++
++ private static final Logger log = Logger.getLogger(AddressUtilTest.class);
++
++ public void testGetNegativeTtl() {
++ log.info("Checking that we can get the ttl on dns failures.");
++ int expectedTtl = 20;
++ boolean expectException = false;
++ /* TODO replace all of this with Powermock on the Security class */
++ try {
++ Security.setProperty("networkaddress.cache.negative.ttl", Integer.toString(expectedTtl));
++ } catch (SecurityException exception) {
++ log.warn("We can't set the DNS cache period, so we're only testing fetching the system value.");
++ expectedTtl = 10;
++ }
++ try {
++ expectedTtl = Integer.parseInt(Security.getProperty("networkaddress.cache.negative.ttl"));
++ } catch (SecurityException exception) {
++ log.debug("Security manager won't let us fetch the property, testing default path.");
++ expectedTtl = 10;
++ } catch (NumberFormatException exception) {
++ log.debug("property isn't a number, testing default path.");
++ expectedTtl = 10;
++ }
++ if (-1 == expectedTtl) {
++ log.debug("property is set to 'forever', testing exception path");
++ expectException = true;
++ }
++ if (0 > expectedTtl) {
++ log.debug("property is a negative value other than 'forever', testing default path.");
++ expectedTtl = 10;
++ }
++ try {
++ if (expectException) {
++ log.info("AddressUtil is (hopefully) going to spit out an error about DNS lookups. you can ignore it.");
++ }
++ int result = AddressUtil.getAddressCacheNegativeTtl(null);
++ if (expectException) {
++ fail("The JVM Security settings cache DNS failures forever. In this case we expect an exception but didn't get one.");
++ }
++ assertEquals("Didn't get the ttl we expected", expectedTtl, result);
++ } catch (IllegalArgumentException exception) {
++ if (!expectException) {
++ log.error("Got an exception when we weren't expecting.", exception);
++ fail("We only expect to throw an IllegalArgumentException when the JVM caches DNS failures forever.");
++ }
++ }
++ }
++
++ public void testGetNegativeTtlThrowsOnForever() {
++ log.info("When DNS is cached forever, we should throw.");
++ /* TODO replace all of this with Powermock on the Security class */
++ try {
++ Security.setProperty("networkaddress.cache.negative.ttl", "-1");
++ } catch (SecurityException exception) {
++ log.error("We can't set the DNS cache period, so this test is effectively ignored.");
++ return;
++ }
++ try {
++ log.info("AddressUtil is (hopefully) going to spit out an error about DNS lookups. you can ignore it.");
++ int result = AddressUtil.getAddressCacheNegativeTtl(null);
++ fail("The JVM Security settings cache DNS failures forever, this should cause an exception.");
++ } catch(IllegalArgumentException exception) {
++ assertTrue(true);
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/Accumulo.java
index 33bb871,0000000..ce99245
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,309 -1,0 +1,330 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.trace.DistributedTrace;
++import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+public class Accumulo {
+
+ private static final Logger log = Logger.getLogger(Accumulo.class);
+
+ public static synchronized void updateAccumuloVersion(FileSystem fs) {
+ try {
+ if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
+ fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION));
+ fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION), false);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
+ int dataVersion;
+ try {
+ FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
+ if (files == null || files.length == 0) {
+ dataVersion = -1; // assume it is 0.5 or earlier
+ } else {
+ dataVersion = Integer.parseInt(files[0].getPath().getName());
+ }
+ return dataVersion;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
+ }
+ }
+
+ public static void enableTracing(String address, String application) {
+ try {
+ DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
+ } catch (Exception ex) {
+ log.error("creating remote sink for trace spans", ex);
+ }
+ }
+
+ private static class LogMonitor extends FileWatchdog implements Watcher {
+ String path;
+
+ protected LogMonitor(String instance, String filename, int delay) {
+ super(filename);
+ setDelay(delay);
+ this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+ }
+
+ private void setMonitorPort() {
+ try {
+ String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+ System.setProperty("org.apache.accumulo.core.host.log.port", port);
+ log.info("Changing monitor log4j port to "+port);
+ doOnChange();
+ } catch (Exception e) {
+ log.error("Error reading zookeeper data for monitor log4j port", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+ setMonitorPort();
+ log.info("Set watch for monitor log4j port");
+ } catch (Exception e) {
+ log.error("Unable to set watch for monitor log4j port " + path);
+ }
+ super.run();
+ }
+
+ @Override
+ protected void doOnChange() {
+ LogManager.resetConfiguration();
+ new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ setMonitorPort();
+ if (event.getPath() != null) {
+ try {
+ ZooReaderWriter.getInstance().exists(event.getPath(), this);
+ } catch (Exception ex) {
+ log.error("Unable to reset watch for monitor log4j port", ex);
+ }
+ }
+ }
+ }
+
+ public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException {
+
+ System.setProperty("org.apache.accumulo.core.application", application);
+
+ if (System.getenv("ACCUMULO_LOG_DIR") != null)
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
+ else
+ System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
+
+ String localhost = InetAddress.getLocalHost().getHostName();
+ System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
+
+ if (System.getenv("ACCUMULO_LOG_HOST") != null)
+ System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
+ else
+ System.setProperty("org.apache.accumulo.core.host.log", localhost);
+
+ int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+ System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+
+ // Use a specific log config, if it exists
+ String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
+ if (!new File(logConfig).exists()) {
+ // otherwise, use the generic config
+ logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+ }
+ // Turn off messages about not being able to reach the remote logger... we protect against that.
+ LogLog.setQuietMode(true);
+
+ // Configure logging
+ if (logPort==0)
+ new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+ else
+ DOMConfigurator.configureAndWatch(logConfig, 5000);
+
+ log.info(application + " starting");
+ log.info("Instance " + config.getInstance().getInstanceID());
+ int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
+ log.info("Data Version " + dataVersion);
+ Accumulo.waitForZookeeperAndHdfs(fs);
+
+ Version codeVersion = new Version(Constants.VERSION);
+ if (dataVersion != Constants.DATA_VERSION && dataVersion != Constants.PREV_DATA_VERSION) {
+ throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
+ }
+
+ TreeMap<String,String> sortedProps = new TreeMap<String,String>();
+ for (Entry<String,String> entry : config.getConfiguration())
+ sortedProps.put(entry.getKey(), entry.getValue());
+
+ for (Entry<String,String> entry : sortedProps.entrySet()) {
+ if (entry.getKey().toLowerCase().contains("password") || entry.getKey().toLowerCase().contains("secret")
+ || entry.getKey().startsWith(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey()))
+ log.info(entry.getKey() + " = <hidden>");
+ else
+ log.info(entry.getKey() + " = " + entry.getValue());
+ }
+
+ monitorSwappiness();
+ }
+
+ /**
+ *
+ */
+ public static void monitorSwappiness() {
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String procFile = "/proc/sys/vm/swappiness";
+ File swappiness = new File(procFile);
+ if (swappiness.exists() && swappiness.canRead()) {
+ InputStream is = new FileInputStream(procFile);
+ try {
+ byte[] buffer = new byte[10];
+ int bytes = is.read(buffer);
+ String setting = new String(buffer, 0, bytes);
+ setting = setting.trim();
+ if (bytes > 0 && Integer.parseInt(setting) > 10) {
+ log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
+ + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
+ }
+ } finally {
+ is.close();
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t, t);
+ }
+ }
+ }, 1000, 10 * 60 * 1000);
+ }
+
+ public static String getLocalAddress(String[] args) throws UnknownHostException {
+ InetAddress result = InetAddress.getLocalHost();
+ for (int i = 0; i < args.length - 1; i++) {
+ if (args[i].equals("-a") || args[i].equals("--address")) {
+ result = InetAddress.getByName(args[i + 1]);
+ log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")");
+ break;
+ }
+ }
+ return result.getHostName();
+ }
+
+ public static void waitForZookeeperAndHdfs(FileSystem fs) {
+ log.info("Attempting to talk to zookeeper");
+ while (true) {
+ try {
+ ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
+ break;
+ } catch (InterruptedException e) {
+ // ignored
+ } catch (KeeperException ex) {
+ log.info("Waiting for accumulo to be initialized");
+ UtilWaitThread.sleep(1000);
+ }
+ }
+ log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
+ long sleep = 1000;
++ int unknownHostTries = 3;
+ while (true) {
+ try {
+ if (!isInSafeMode(fs))
+ break;
+ log.warn("Waiting for the NameNode to leave safemode");
+ } catch (IOException ex) {
- log.warn("Unable to connect to HDFS");
++ log.warn("Unable to connect to HDFS", ex);
++ } catch (IllegalArgumentException exception) {
++ /* Unwrap the UnknownHostException so we can deal with it directly */
++ if (exception.getCause() instanceof UnknownHostException) {
++ if (unknownHostTries > 0) {
++ log.warn("Unable to connect to HDFS, will retry. cause: " + exception.getCause());
++ /* We need to make sure our sleep period is long enough to avoid getting a cached failure of the host lookup. */
++ sleep = Math.max(sleep, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException)(exception.getCause()))+1)*1000);
++ } else {
++ log.error("Unable to connect to HDFS and have exceeded max number of retries.", exception);
++ throw exception;
++ }
++ unknownHostTries--;
++ } else {
++ throw exception;
++ }
+ }
- log.info("Sleeping " + sleep / 1000. + " seconds");
++ log.info("Backing off due to failure; current sleep period is " + sleep / 1000. + " seconds");
+ UtilWaitThread.sleep(sleep);
++ /* Back off to give transient failures more time to clear. */
+ sleep = Math.min(60 * 1000, sleep * 2);
+ }
+ log.info("Connected to HDFS");
+ }
+
+ private static boolean isInSafeMode(FileSystem fs) throws IOException {
+ if (!(fs instanceof DistributedFileSystem))
+ return false;
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> safeModeAction;
+ try {
+ // hadoop 2.0
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ return (Boolean) setSafeMode.invoke(dfs, get);
++ } catch (IllegalArgumentException exception) {
++ /* Send IAEs back as-is, so that those that wrap UnknownHostException can be handled in the same place as similar sources of failure. */
++ throw exception;
+ } catch (Exception ex) {
- throw new RuntimeException("cannot find method setSafeMode");
++ throw new RuntimeException("cannot find method setSafeMode", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
index 6f10e17,0000000..f13d3a9
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
@@@ -1,234 -1,0 +1,242 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.tableOps;
+
+import java.io.IOException;
++import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.TableOperation;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.GrepIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.tables.TableManager;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+class CleanUp extends MasterRepo {
+
+ final private static Logger log = Logger.getLogger(CleanUp.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId;
+
+ private long creationTime;
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+
+ /*
+ * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine
+ *
+ * if the new machine has time in the future, that will work ok w/ hasCycled
+ */
+ if (System.currentTimeMillis() < creationTime) {
+ creationTime = System.currentTimeMillis();
+ }
+
+ }
+
+ public CleanUp(String tableId) {
+ this.tableId = tableId;
+ creationTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public long isReady(long tid, Master master) throws Exception {
+ if (!master.hasCycled(creationTime)) {
+ return 50;
+ }
+
+ boolean done = true;
+ Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+ Scanner scanner = master.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+ MetaDataTableScanner.configureScanner(scanner, master);
+ scanner.setRange(tableRange);
+
+ KeyExtent prevExtent = null;
+ for (Entry<Key,Value> entry : scanner) {
+ TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
+ if (!locationState.extent.isPreviousExtent(prevExtent)) {
+ log.debug("Still waiting for table to be deleted: " + tableId + " saw inconsistency" + prevExtent + " " + locationState.extent);
+ done = false;
+ break;
+ }
+ prevExtent = locationState.extent;
+
+ TabletState state = locationState.getState(master.onlineTabletServers());
+ if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) {
+ log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState);
+ done = false;
+ break;
+ }
+ }
+
+ if (!done)
+ return 50;
+
+ return 0;
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master master) throws Exception {
+
+ master.clearMigrations(tableId);
+
+ int refCount = 0;
+
+ try {
+ // look for other tables that references this tables files
+ Connector conn = master.getConnector();
+ BatchScanner bs = conn.createBatchScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 8);
+ try {
+ bs.setRanges(Collections.singleton(Constants.NON_ROOT_METADATA_KEYSPACE));
+ bs.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
+ IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class);
+ GrepIterator.setTerm(cfg, "../" + tableId + "/");
+ bs.addScanIterator(cfg);
+
+ for (Entry<Key,Value> entry : bs) {
+ if (entry.getKey().getColumnQualifier().toString().startsWith("../" + tableId + "/")) {
+ refCount++;
+ }
+ }
+ } finally {
+ bs.close();
+ }
+
+ } catch (Exception e) {
+ refCount = -1;
+ log.error("Failed to scan !METADATA looking for references to deleted table " + tableId, e);
+ }
+
+ // remove metadata table entries
+ try {
+ // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself.
+ // If the master lock passed to deleteTable, it is possible that the delete mutations will be dropped. If the delete operations
+ // are dropped and the operation completes, then the deletes will not be repeated.
+ MetadataTable.deleteTable(tableId, refCount != 0, SecurityConstants.getSystemCredentials(), null);
+ } catch (Exception e) {
+ log.error("error deleting " + tableId + " from metadata table", e);
+ }
+
+ // remove any problem reports the table may have
+ try {
+ ProblemReports.getInstance().deleteProblemReports(tableId);
+ } catch (Exception e) {
+ log.error("Failed to delete problem reports for table " + tableId, e);
+ }
+
+ if (refCount == 0) {
+ // delete the map files
+ try {
+ FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ fs.delete(new Path(ServerConstants.getTablesDir(), tableId), true);
+ } catch (IOException e) {
+ log.error("Unable to remove deleted table directory", e);
++ } catch (IllegalArgumentException exception) {
++ if (exception.getCause() instanceof UnknownHostException) {
++ /* Thrown if HDFS encounters a DNS problem in some edge cases */
++ log.error("Unable to remove deleted table directory", exception);
++ } else {
++ throw exception;
++ }
+ }
+ }
+
+ // remove table from zookeeper
+ try {
+ TableManager.getInstance().removeTable(tableId);
+ Tables.clearCache(master.getInstance());
+ } catch (Exception e) {
+ log.error("Failed to find table id in zookeeper", e);
+ }
+
+ // remove any permissions associated with this table
+ try {
+ AuditedSecurityOperation.getInstance().deleteTable(SecurityConstants.getSystemCredentials(), tableId);
+ } catch (ThriftSecurityException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ Utils.unreserveTable(tableId, tid, true);
+
+ Logger.getLogger(CleanUp.class).debug("Deleted table " + tableId);
+
+ return null;
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ // nothing to do
+ }
+
+}
+
+public class DeleteTable extends MasterRepo {
+
+ private static final long serialVersionUID = 1L;
+
+ private String tableId;
+
+ public DeleteTable(String tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public long isReady(long tid, Master environment) throws Exception {
+ return Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
+ }
+
+ @Override
+ public Repo<Master> call(long tid, Master environment) throws Exception {
+ TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
+ environment.getEventCoordinator().event("deleting table %s ", tableId);
+ return new CleanUp(tableId);
+ }
+
+ @Override
+ public void undo(long tid, Master environment) throws Exception {
+ Utils.unreserveTable(tableId, tid, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4cd3b1b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
index 8e4af64,0000000..dbf0c3c
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
@@@ -1,498 -1,0 +1,500 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReportingIterator;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+
+public class Compactor implements Callable<CompactionStats> {
+
+ public class CountingIterator extends WrappingIterator {
+
+ private long count;
+
+ public CountingIterator deepCopy(IteratorEnvironment env) {
+ return new CountingIterator(this, env);
+ }
+
+ private CountingIterator(CountingIterator other, IteratorEnvironment env) {
+ setSource(other.getSource().deepCopy(env));
+ count = 0;
+ }
+
+ public CountingIterator(SortedKeyValueIterator<Key,Value> source) {
+ this.setSource(source);
+ count = 0;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void next() throws IOException {
+ super.next();
+ count++;
+ if (count % 1024 == 0) {
+ entriesRead.addAndGet(1024);
+ }
+ }
+
+ public long getCount() {
+ return count;
+ }
+ }
+
+ private static final Logger log = Logger.getLogger(Compactor.class);
+
+ static class CompactionCanceledException extends Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+ static interface CompactionEnv {
+ boolean isCompactionEnabled();
+
+ IteratorScope getIteratorScope();
+ }
+
+ private Map<String,DataFileValue> filesToCompact;
+ private InMemoryMap imm;
+ private String outputFile;
+ private boolean propogateDeletes;
+ private TableConfiguration acuTableConf;
+ private CompactionEnv env;
+ private Configuration conf;
+ private FileSystem fs;
+ protected KeyExtent extent;
+ private List<IteratorSetting> iterators;
+
+ // things to report
+ private String currentLocalityGroup = "";
+ private long startTime;
+
+ private MajorCompactionReason reason;
+ protected MinorCompactionReason mincReason;
+
+ private AtomicLong entriesRead = new AtomicLong(0);
+ private AtomicLong entriesWritten = new AtomicLong(0);
+ private DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+
+ private synchronized void setLocalityGroup(String name) {
+ this.currentLocalityGroup = name;
+ }
+
+ private void clearStats() {
+ entriesRead.set(0);
+ entriesWritten.set(0);
+ }
+
+ protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
+
+ public static class CompactionInfo {
+
+ private Compactor compactor;
+ private String localityGroup;
+ private long entriesRead;
+ private long entriesWritten;
+
+ CompactionInfo(Compactor compactor) {
+ this.localityGroup = compactor.currentLocalityGroup;
+ this.entriesRead = compactor.entriesRead.get();
+ this.entriesWritten = compactor.entriesWritten.get();
+ this.compactor = compactor;
+ }
+
+ public ActiveCompaction toThrift() {
+
+ CompactionType type;
+
+ if (compactor.imm != null)
+ if (compactor.filesToCompact.size() > 0)
+ type = CompactionType.MERGE;
+ else
+ type = CompactionType.MINOR;
+ else if (!compactor.propogateDeletes)
+ type = CompactionType.FULL;
+ else
+ type = CompactionType.MAJOR;
+
+ CompactionReason reason;
+
+ if (compactor.imm != null)
+ switch(compactor.mincReason){
+ case USER:
+ reason = CompactionReason.USER;
+ break;
+ case CLOSE:
+ reason = CompactionReason.CLOSE;
+ break;
+ case SYSTEM:
+ default:
+ reason = CompactionReason.SYSTEM;
+ break;
+ }
+ else
+ switch (compactor.reason) {
+ case USER:
+ reason = CompactionReason.USER;
+ break;
+ case CHOP:
+ reason = CompactionReason.CHOP;
+ break;
+ case IDLE:
+ reason = CompactionReason.IDLE;
+ break;
+ case NORMAL:
+ default:
+ reason = CompactionReason.SYSTEM;
+ break;
+ }
+
+ List<IterInfo> iiList = new ArrayList<IterInfo>();
+ Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
+
+ for (IteratorSetting iterSetting : compactor.iterators) {
+ iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
+ iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
+ }
+
+ return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, new ArrayList<String>(
+ compactor.filesToCompact.keySet()), compactor.outputFile, type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+ }
+ }
+
+ public static List<CompactionInfo> getRunningCompactions() {
+ ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
+
+ synchronized (runningCompactions) {
+ for (Compactor compactor : runningCompactions) {
+ compactions.add(new CompactionInfo(compactor));
+ }
+ }
+
+ return compactions;
+ }
+
+ Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+ TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
+ this.extent = extent;
+ this.conf = conf;
+ this.fs = fs;
+ this.filesToCompact = files;
+ this.imm = imm;
+ this.outputFile = outputFile;
+ this.propogateDeletes = propogateDeletes;
+ this.acuTableConf = acuTableConf;
+ this.env = env;
+ this.iterators = iterators;
+ this.reason = reason;
+
+ startTime = System.currentTimeMillis();
+ }
+
+ Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+ TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
+ this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
+ }
+
+ public FileSystem getFileSystem() {
+ return fs;
+ }
+
+ KeyExtent getExtent() {
+ return extent;
+ }
+
+ String getOutputFile() {
+ return outputFile;
+ }
+
+ @Override
+ public CompactionStats call() throws IOException, CompactionCanceledException {
+
+ FileSKVWriter mfw = null;
+
+ CompactionStats majCStats = new CompactionStats();
+
+ boolean remove = runningCompactions.add(this);
+
+ clearStats();
+
+ String oldThreadName = Thread.currentThread().getName();
+ String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
+ Thread.currentThread().setName(newThreadName);
+ try {
+ FileOperations fileFactory = FileOperations.getInstance();
+ mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf);
+
+ Map<String,Set<ByteSequence>> lGroups;
+ try {
+ lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
+ } catch (LocalityGroupConfigurationError e) {
+ throw new IOException(e);
+ }
+
+ long t1 = System.currentTimeMillis();
+
+ HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
+
+ if (mfw.supportsLocalityGroups()) {
+ for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
+ setLocalityGroup(entry.getKey());
+ compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
+ allColumnFamilies.addAll(entry.getValue());
+ }
+ }
+
+ setLocalityGroup("");
+ compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
+
+ long t2 = System.currentTimeMillis();
+
+ FileSKVWriter mfwTmp = mfw;
+ mfw = null; // set this to null so we do not try to close it again in finally if the close fails
+ mfwTmp.close(); // if the close fails it will cause the compaction to fail
+
+ // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
+ try {
+ FileSKVIterator openReader = fileFactory.openReader(outputFile, false, fs, conf, acuTableConf);
+ openReader.close();
+ } catch (IOException ex) {
+ log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
+ throw ex;
+ }
+
+ log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
+ majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
+
+ majCStats.setFileSize(fileFactory.getFileSize(outputFile, fs, conf, acuTableConf));
+ return majCStats;
+ } catch (IOException e) {
+ log.error(e, e);
+ throw e;
+ } catch (RuntimeException e) {
+ log.error(e, e);
+ throw e;
+ } finally {
+ Thread.currentThread().setName(oldThreadName);
+ if (remove)
+ runningCompactions.remove(this);
+
+ try {
+ if (mfw != null) {
+ // compaction must not have finished successfully, so close its output file
+ try {
+ mfw.close();
+ } finally {
+ Path path = new Path(outputFile);
+ if (!fs.delete(path, true))
+ if (fs.exists(path))
+ log.error("Unable to delete " + outputFile);
+ }
+ }
+ } catch (IOException e) {
+ log.warn(e, e);
++ } catch (RuntimeException exception) {
++ log.warn(exception, exception);
+ }
+ }
+ }
+
+ private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
+
+ for (String mapFile : filesToCompact.keySet()) {
+ try {
+
+ FileOperations fileFactory = FileOperations.getInstance();
+
+ FileSKVIterator reader;
+
+ reader = fileFactory.openReader(mapFile, false, fs, conf, acuTableConf);
+
+ readers.add(reader);
+
+ SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile, false, reader);
+
+ if (filesToCompact.get(mapFile).isTimeSet()) {
+ iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
+ }
+
+ iters.add(iter);
+
+ } catch (Throwable e) {
+
+ ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile, e));
+
+ log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
+ // failed to open some map file... close the ones that were opened
+ for (FileSKVIterator reader : readers) {
+ try {
+ reader.close();
+ } catch (Throwable e2) {
+ log.warn("Failed to close map file", e2);
+ }
+ }
+
+ readers.clear();
+
+ if (e instanceof IOException)
+ throw (IOException) e;
+ throw new IOException("Failed to open map data files", e);
+ }
+ }
+
+ return iters;
+ }
+
+ private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
+ throws IOException, CompactionCanceledException {
+ ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
+ Span span = Trace.start("compact");
+ try {
+ long entriesCompacted = 0;
+ List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
+
+ if (imm != null) {
+ iters.add(imm.compactionIterator());
+ }
+
+ CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()));
+ DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+
+ // if(env.getIteratorScope() )
+
+ TabletIteratorEnvironment iterEnv;
+ if (env.getIteratorScope() == IteratorScope.majc)
+ iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
+ else if (env.getIteratorScope() == IteratorScope.minc)
+ iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
+ else
+ throw new IllegalArgumentException();
+
+ SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
+ iterators, iterEnv));
+
+ itr.seek(extent.toDataRange(), columnFamilies, inclusive);
+
+ if (!inclusive) {
+ mfw.startDefaultLocalityGroup();
+ } else {
+ mfw.startNewLocalityGroup(lgName, columnFamilies);
+ }
+
+ Span write = Trace.start("write");
+ try {
+ while (itr.hasTop() && env.isCompactionEnabled()) {
+ mfw.append(itr.getTopKey(), itr.getTopValue());
+ itr.next();
+ entriesCompacted++;
+
+ if (entriesCompacted % 1024 == 0) {
+ // Periodically update stats, do not want to do this too often since its volatile
+ entriesWritten.addAndGet(1024);
+ }
+ }
+
+ if (itr.hasTop() && !env.isCompactionEnabled()) {
+ // cancel major compaction operation
+ try {
+ try {
+ mfw.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ fs.delete(new Path(outputFile), true);
+ } catch (Exception e) {
+ log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
+ }
+ throw new CompactionCanceledException();
+ }
+
+ } finally {
+ CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
+ majCStats.add(lgMajcStats);
+ write.stop();
+ }
+
+ } finally {
+ // close sequence files opened
+ for (FileSKVIterator reader : readers) {
+ try {
+ reader.close();
+ } catch (Throwable e) {
+ log.warn("Failed to close map file", e);
+ }
+ }
+ span.stop();
+ }
+ }
+
+}