You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:38 UTC
[32/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java b/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java
new file mode 100644
index 0000000..738d6c8
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.rpc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServerWithFix;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.utils.Networks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @param <T> The type of service handler interface.
+ * @param <I> The type of the thrift service.
+ */
+public final class ThriftRPCServer<T extends RPCServiceHandler, I> extends AbstractExecutionThreadService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftRPCServer.class);
+
+ private final String name;
+ private final int ioThreads;
+ private final int workerThreads;
+ private final int maxReadBufferBytes;
+ private final T serviceHandler;
+ private final TProcessor processor;
+
+ private InetSocketAddress bindAddress;
+ private ExecutorService executor;
+ private TServer server;
+
+ /**
+ * Creates a {@link Builder} for creating instance of {@link ThriftRPCServer}.
+ * @param serviceType Class of the thrift service.
+ * @param <I> Type of the thrift service.
+ * @return A {@link Builder}.
+ */
+ public static <I> Builder<I> builder(Class<I> serviceType) {
+ return new Builder<I>(serviceType);
+ }
+
+ /**
+ * Builder for creating instance of ThriftRPCServer. By default, the instance created will bind to
+ * random port and with 2 io threads and worker threads equals to min(2, number of cpu cores - 2).
+ */
+ public static final class Builder<I> {
+ private final Class<I> serviceType;
+ private String name;
+ private InetSocketAddress bindAddress = new InetSocketAddress(0);
+ private int ioThreads = 2;
+ private int workerThreads = Runtime.getRuntime().availableProcessors() - 2;
+ // 16Mb
+ private int maxReadBufferBytes = 16 * 1024 * 1024;
+
+ private Builder(Class<I> serviceType) {
+ this.serviceType = serviceType;
+ this.name = serviceType.getSimpleName();
+ }
+
+ public Builder<I> setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public Builder<I> setHost(String host) {
+ this.bindAddress = new InetSocketAddress(host, bindAddress.getPort());
+ return this;
+ }
+
+ public Builder<I> setPort(int port) {
+ this.bindAddress = new InetSocketAddress(bindAddress.getHostName(), port);
+ return this;
+ }
+
+ public Builder<I> setIOThreads(int count) {
+ this.ioThreads = count;
+ return this;
+ }
+
+ public Builder<I> setWorkerThreads(int count) {
+ this.workerThreads = count;
+ return this;
+ }
+
+ public Builder<I> setMaxReadBufferBytes(int maxReadBufferBytes) {
+ this.maxReadBufferBytes = maxReadBufferBytes;
+ return this;
+ }
+
+ public <T extends RPCServiceHandler> ThriftRPCServer<T, I> build(T serviceHandler) {
+ return new ThriftRPCServer<T, I>(bindAddress, ioThreads, workerThreads, maxReadBufferBytes,
+ serviceHandler, serviceType, name);
+ }
+ }
+
+ /**
+ * Creates a ThriftRPCServer with the given paramters.
+ *
+ * @param bindAddress The socket address for the server to listen on. If {@code null}, it'll be binded to random
+ * port on localhost.
+ * @param ioThreads Number of io threads.
+ * @param workerThreads Number of worker threads.
+ * @param serviceHandler Handler for handling client requests.
+ */
+ @SuppressWarnings("unchecked")
+ private ThriftRPCServer(InetSocketAddress bindAddress, int ioThreads,
+ int workerThreads, int maxReadBufferBytes,
+ T serviceHandler, Class<I> serviceType, String name) {
+ Preconditions.checkArgument(ioThreads > 0, "IO threads must be > 0.");
+ Preconditions.checkArgument(workerThreads > 0, "Worker threads must be > 0.");
+
+ this.bindAddress = bindAddress;
+ this.ioThreads = ioThreads;
+ this.workerThreads = workerThreads;
+ this.maxReadBufferBytes = maxReadBufferBytes;
+ this.serviceHandler = serviceHandler;
+ this.name = name;
+ this.processor = createProcessor((Class<T>) serviceHandler.getClass(), serviceType);
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ // Determines the address and port to listen on
+ InetSocketAddress listenOn = bindAddress;
+ if (listenOn == null || listenOn.getPort() <= 0) {
+ int port = Networks.getRandomPort();
+ if (listenOn == null) {
+ listenOn = new InetSocketAddress("localhost", port);
+ } else {
+ listenOn = new InetSocketAddress(listenOn.getAddress(), port);
+ }
+ }
+ bindAddress = listenOn;
+
+ executor = new ThreadPoolExecutor(0, workerThreads,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ Threads.createDaemonThreadFactory(String.format("%s-rpc-%%d", name)),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ serviceHandler.init();
+
+ TThreadedSelectorServerWithFix.Args args =
+ new TThreadedSelectorServerWithFix.Args(new TNonblockingServerSocket(listenOn))
+ .selectorThreads(ioThreads)
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .transportFactory(new TFramedTransport.Factory())
+ .processor(processor)
+ .executorService(executor);
+
+ // ENG-443 - Set the max read buffer size. This is important as this will
+ // prevent the server from throwing OOME if telnetd to the port
+ // it's running on.
+ args.maxReadBufferBytes = maxReadBufferBytes;
+ server = new TThreadedSelectorServerWithFix(args);
+ LOG.info("Starting RPC server for {}", name);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ serviceHandler.destroy();
+ executor.shutdownNow();
+ LOG.info("RPC server for {} stopped.", name);
+ }
+
+ @Override
+ protected void triggerShutdown() {
+ LOG.info("Request to stop RPC server for {}", name);
+ server.stop();
+ }
+
+ @Override
+ protected void run() throws Exception {
+ LOG.info("Running RPC server for {}", name);
+ server.serve();
+ LOG.info("Done running RPC server for {}", name);
+ }
+
+ @SuppressWarnings("unchecked")
+ private TProcessor createProcessor(final Class<T> handlerType, Class<I> serviceType) {
+ // Pick the Iface inner interface and the Processor class
+ Class<? extends TProcessor> processorType = null;
+ Class<?> ifaceType = null;
+ for (Class<?> clz : serviceType.getDeclaredClasses()) {
+ if (TProcessor.class.isAssignableFrom(clz)) {
+ processorType = (Class<? extends TProcessor>) clz;
+ } else if (clz.isInterface() && "Iface".equals(clz.getSimpleName())) {
+ ifaceType = clz;
+ }
+ }
+
+ Preconditions.checkArgument(processorType != null,
+ "Missing TProcessor, %s is not a valid thrift service.", serviceType.getName());
+ Preconditions.checkArgument(ifaceType != null,
+ "Missing Iface, %s is not a valid thrift service.", serviceType.getName());
+
+ // If handler already implements the Iface, simply delegate
+ if (ifaceType.isAssignableFrom(handlerType)) {
+ return createProxyProcessor(handlerType, processorType, ifaceType);
+ }
+
+ throw new IllegalArgumentException("Unsupported handler type.");
+ }
+
+ private TProcessor createProxyProcessor(final Class<T> handlerType,
+ Class<? extends TProcessor> processorType, Class<?> ifaceType) {
+
+ try {
+ // Map from Iface method to handlerType method to save reflection lookup
+ ImmutableMap.Builder<Method, Method> builder = ImmutableMap.builder();
+ for (Method method : ifaceType.getMethods()) {
+ Method handlerMethod = handlerType.getMethod(method.getName(), method.getParameterTypes());
+ if (!handlerMethod.isAccessible()) {
+ handlerMethod.setAccessible(true);
+ }
+ builder.put(method, handlerMethod);
+ }
+ final Map<Method, Method> methods = builder.build();
+
+ Object proxy = Proxy.newProxyInstance(ifaceType.getClassLoader(),
+ new Class[]{ifaceType}, new InvocationHandler() {
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ try {
+ return methods.get(method).invoke(serviceHandler, args);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() != null) {
+ throw e.getCause();
+ } else {
+ throw e;
+ }
+ }
+ }
+ });
+
+ return processorType.getConstructor(ifaceType).newInstance(proxy);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java b/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java
new file mode 100644
index 0000000..96a5fe5
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains class for writing RPC server and client in simple manner.
+ */
+package org.apache.tephra.rpc;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java
new file mode 100644
index 0000000..b8ae111
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides Guice bindings for {@link Configuration}.
+ */
+public final class ConfigModule extends AbstractModule {
+
+ private final Configuration configuration;
+
+ public ConfigModule(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ protected void configure() {
+ bind(Configuration.class).toInstance(configuration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java
new file mode 100644
index 0000000..b4e2d1b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.InMemoryDiscoveryService;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.ZKClientService;
+
+/**
+ * Provides access to Google Guice modules for in-memory, single-node, and distributed operation for
+ * {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ */
+public final class DiscoveryModules {
+
+ public Module getInMemoryModules() {
+ return new InMemoryDiscoveryModule();
+ }
+
+ public Module getSingleNodeModules() {
+ return new InMemoryDiscoveryModule();
+ }
+
+ public Module getDistributedModules() {
+ return new ZKDiscoveryModule();
+ }
+
+ private static final class InMemoryDiscoveryModule extends AbstractModule {
+
+ // ensuring to be singleton across JVM
+ private static final InMemoryDiscoveryService IN_MEMORY_DISCOVERY_SERVICE = new InMemoryDiscoveryService();
+
+ @Override
+ protected void configure() {
+ InMemoryDiscoveryService discovery = IN_MEMORY_DISCOVERY_SERVICE;
+ bind(DiscoveryService.class).toInstance(discovery);
+ bind(DiscoveryServiceClient.class).toInstance(discovery);
+ }
+ }
+
+ private static final class ZKDiscoveryModule extends PrivateModule {
+
+ @Override
+ protected void configure() {
+ expose(DiscoveryService.class);
+ expose(DiscoveryServiceClient.class);
+ }
+
+ @Provides
+ @Singleton
+ private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) {
+ return new ZKDiscoveryService(zkClient);
+ }
+
+ @Provides
+ @Singleton
+ private DiscoveryService providesDiscoveryService(final ZKClientService zkClient,
+ final ZKDiscoveryService delegate) {
+ return new DiscoveryService() {
+ @Override
+ public Cancellable register(Discoverable discoverable) {
+ if (!zkClient.isRunning()) {
+ zkClient.startAndWait();
+ }
+ return delegate.register(discoverable);
+ }
+ };
+ }
+
+ @Provides
+ @Singleton
+ private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient,
+ final ZKDiscoveryService delegate) {
+ return new DiscoveryServiceClient() {
+ @Override
+ public ServiceDiscovered discover(String s) {
+ if (!zkClient.isRunning()) {
+ zkClient.startAndWait();
+ }
+ return delegate.discover(s);
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java
new file mode 100644
index 0000000..6689d05
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.ThreadLocalClientProvider;
+import org.apache.tephra.distributed.ThriftClientProvider;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+
+/**
+ * Provides Guice binding for {@link ThriftClientProvider}.
+ */
+public class TransactionClientModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(ThriftClientProvider.class).toProvider(ThriftClientProviderSupplier.class);
+ }
+
+ /**
+ * Provides implementation of {@link ThriftClientProvider}
+ * based on configuration.
+ */
+ @Singleton
+ private static final class ThriftClientProviderSupplier implements Provider<ThriftClientProvider> {
+
+ private final Configuration cConf;
+ private DiscoveryServiceClient discoveryServiceClient;
+
+ @Inject
+ ThriftClientProviderSupplier(Configuration cConf) {
+ this.cConf = cConf;
+ }
+
+ @Inject(optional = true)
+ void setDiscoveryServiceClient(DiscoveryServiceClient discoveryServiceClient) {
+ this.discoveryServiceClient = discoveryServiceClient;
+ }
+
+ @Override
+ public ThriftClientProvider get() {
+ // configure the client provider
+ String provider = cConf.get(TxConstants.Service.CFG_DATA_TX_CLIENT_PROVIDER,
+ TxConstants.Service.DEFAULT_DATA_TX_CLIENT_PROVIDER);
+ ThriftClientProvider clientProvider;
+ if ("pool".equals(provider)) {
+ clientProvider = new PooledClientProvider(cConf, discoveryServiceClient);
+ } else if ("thread-local".equals(provider)) {
+ clientProvider = new ThreadLocalClientProvider(cConf, discoveryServiceClient);
+ } else {
+ String message = "Unknown Transaction Service Client Provider '" + provider + "'.";
+ throw new IllegalArgumentException(message);
+ }
+ return clientProvider;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
new file mode 100644
index 0000000..aaf3534
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+import org.apache.tephra.DefaultTransactionExecutor;
+import org.apache.tephra.TransactionExecutor;
+import org.apache.tephra.TransactionExecutorFactory;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.metrics.DefaultMetricsCollector;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+/**
+ * Guice bindings for running in distributed mode on a cluster.
+ */
+final class TransactionDistributedModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(SnapshotCodecProvider.class).in(Singleton.class);
+ bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
+ .to(HDFSTransactionStateStorage.class).in(Singleton.class);
+ bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+
+ bind(TransactionManager.class).in(Singleton.class);
+ bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
+ bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
+
+ install(new FactoryModuleBuilder()
+ .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
+ .build(TransactionExecutorFactory.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
new file mode 100644
index 0000000..de7678a
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.tephra.DefaultTransactionExecutor;
+import org.apache.tephra.TransactionExecutor;
+import org.apache.tephra.TransactionExecutorFactory;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.NoOpTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+/**
+ * Guice bindings for running completely in-memory (no persistence). This should only be used for
+ * test classes, as the transaction state cannot be recovered in the case of a failure.
+ */
+public class TransactionInMemoryModule extends AbstractModule {
+ public TransactionInMemoryModule() {
+ }
+
+ @Override
+ protected void configure() {
+ bind(SnapshotCodecProvider.class).in(Singleton.class);
+ bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class);
+ bind(TransactionManager.class).in(Singleton.class);
+ bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
+ // no metrics output for in-memory
+ bind(MetricsCollector.class).to(TxMetricsCollector.class);
+
+ install(new FactoryModuleBuilder()
+ .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
+ .build(TransactionExecutorFactory.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
new file mode 100644
index 0000000..7d0b663
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+import org.apache.tephra.DefaultTransactionExecutor;
+import org.apache.tephra.TransactionExecutor;
+import org.apache.tephra.TransactionExecutorFactory;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.DefaultMetricsCollector;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.persist.LocalFileTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+/**
+ * Guice bindings for running in single-node mode (persistence to local disk and in-memory client).
+ */
+final class TransactionLocalModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(SnapshotCodecProvider.class).in(Singleton.class);
+ bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
+ .to(LocalFileTransactionStateStorage.class).in(Singleton.class);
+ bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+
+ bind(TransactionManager.class).in(Singleton.class);
+ bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
+ bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
+
+ install(new FactoryModuleBuilder()
+ .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
+ .build(TransactionExecutorFactory.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
new file mode 100644
index 0000000..a3fe1c1
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.Module;
+
+/**
+ * Provides access to Google Guice modules for in-memory, single-node, and distributed operation.
+ */
+public class TransactionModules {
+ public TransactionModules() {
+ }
+
+ public Module getInMemoryModules() {
+ return new TransactionInMemoryModule();
+ }
+
+ public Module getSingleNodeModules() {
+ return new TransactionLocalModule();
+ }
+
+ public Module getDistributedModules() {
+ return new TransactionDistributedModule();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java
new file mode 100644
index 0000000..5456553
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.runtime;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.name.Names;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.NoOpTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+
+/**
+ * A provider for {@link TransactionStateStorage} that provides different
+ * {@link TransactionStateStorage} implementation based on configuration.
+ */
+@Singleton
+public final class TransactionStateStorageProvider implements Provider<TransactionStateStorage> {
+
+ private final Configuration cConf;
+ private final Injector injector;
+
+ @Inject
+ TransactionStateStorageProvider(Configuration cConf, Injector injector) {
+ this.cConf = cConf;
+ this.injector = injector;
+ }
+
+ @Override
+ public TransactionStateStorage get() {
+ if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) {
+ return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist")));
+ } else {
+ return injector.getInstance(NoOpTransactionStateStorage.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java
new file mode 100644
index 0000000..da3e019
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides Guice binding to {@link ZKClient} and {@link ZKClientService}.
+ */
+public class ZKModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ /**
+ * ZKClientService is provided by the provider method
+ * {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}.
+ */
+ bind(ZKClient.class).to(ZKClientService.class);
+ }
+
+ @Provides
+ @Singleton
+ private ZKClientService provideZKClientService(Configuration conf) {
+ String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+ if (zkStr == null) {
+ // Default to HBase one.
+ zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM);
+ }
+
+ int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT);
+ ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null,
+ ArrayListMultimap.<String, byte[]>create());
+ return ZKClientServices.delegate(
+ ZKClients.reWatchOnExpire(
+ ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+ )
+ )
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java
new file mode 100644
index 0000000..a1cd6dd
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An decoder to help read snapshots in binary format.
+ */
+public final class BinaryDecoder {
+
+ private final InputStream input;
+
+ /**
+ * @param input Stream to read from.
+ */
+ public BinaryDecoder(InputStream input) {
+ this.input = input;
+ }
+
+ /**
+ * Read one int from the input.
+ * @return the read number
+ * @throws java.io.IOException If there is IO error.
+ * @throws java.io.EOFException If end of file reached.
+ */
+ public int readInt() throws IOException {
+ int val = 0;
+ int shift = 0;
+ int b = readByte();
+ while (b > 0x7f) {
+ val ^= (b & 0x7f) << shift;
+ shift += 7;
+ b = readByte();
+ }
+ val ^= b << shift;
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Read one long int from the input.
+ * @return the read number
+ * @throws java.io.IOException If there is IO error.
+ * @throws java.io.EOFException If end of file reached.
+ */
+ public long readLong() throws IOException {
+ long val = 0;
+ int shift = 0;
+ int b = readByte();
+ while (b > 0x7f) {
+ val ^= (long) (b & 0x7f) << shift;
+ shift += 7;
+ b = readByte();
+ }
+ val ^= (long) b << shift;
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Read a byte sequence. First read an int to indicate how many bytes to read, then that many bytes.
+ * @return the read bytes as a byte array
+ * @throws java.io.IOException If there is IO error.
+ * @throws java.io.EOFException If end of file reached.
+ */
+ public byte[] readBytes() throws IOException {
+ int toRead = readInt();
+ byte[] bytes = new byte[toRead];
+ while (toRead > 0) {
+ int byteRead = input.read(bytes, bytes.length - toRead, toRead);
+ if (byteRead == -1) {
+ throw new EOFException();
+ }
+ toRead -= byteRead;
+ }
+ return bytes;
+ }
+
+ /**
+ * Reads a single byte value.
+ *
+ * @return The byte value read.
+ * @throws java.io.IOException If there is IO error.
+ * @throws java.io.EOFException If end of file reached.
+ */
+ private int readByte() throws IOException {
+ int b = input.read();
+ if (b == -1) {
+ throw new EOFException();
+ }
+ return b;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java
new file mode 100644
index 0000000..c4ce0a8
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An encoder to help encode snapshots in binary format.
+ */
+public final class BinaryEncoder {
+
+ private final OutputStream output;
+
+ /**
+ * @param output stream to write to
+ */
+ public BinaryEncoder(OutputStream output) {
+ this.output = output;
+ }
+
+ /**
+ * write a single int value.
+ * @throws java.io.IOException If there is IO error.
+ */
+ public BinaryEncoder writeInt(int i) throws IOException {
+ // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
+ int val = (i << 1) ^ (i >> 31);
+
+ if ((val & ~0x7f) != 0) {
+ output.write(0x80 | val & 0x7f);
+ val >>>= 7;
+ while (val > 0x7f) {
+ output.write(0x80 | val & 0x7f);
+ val >>>= 7;
+ }
+ }
+ output.write(val);
+
+ return this;
+ }
+
+ /**
+ * write a single long int value.
+ * @throws java.io.IOException If there is IO error.
+ */
+ public BinaryEncoder writeLong(long l) throws IOException {
+ // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
+ long val = (l << 1) ^ (l >> 63);
+
+ if ((val & ~0x7f) != 0) {
+ output.write((int) (0x80 | val & 0x7f));
+ val >>>= 7;
+ while (val > 0x7f) {
+ output.write((int) (0x80 | val & 0x7f));
+ val >>>= 7;
+ }
+ }
+ output.write((int) val);
+
+ return this;
+ }
+
+ /**
+ * write a sequence of bytes. First writes the number of bytes as an int, then the bytes themselves.
+ * @throws java.io.IOException If there is IO error.
+ */
+ public BinaryEncoder writeBytes(byte[] bytes) throws IOException {
+ writeLong(bytes.length);
+ output.write(bytes, 0, bytes.length);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java
new file mode 100644
index 0000000..4a94c74
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ * @deprecated This codec is now deprecated and is replaced by {@link SnapshotCodecV2}.
+ */
+@Deprecated
+public class DefaultSnapshotCodec implements SnapshotCodec {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultSnapshotCodec.class);
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public void encode(OutputStream out, TransactionSnapshot snapshot) {
+ try {
+ BinaryEncoder encoder = new BinaryEncoder(out);
+
+ encoder.writeLong(snapshot.getTimestamp());
+ encoder.writeLong(snapshot.getReadPointer());
+ encoder.writeLong(snapshot.getWritePointer());
+ encodeInvalid(encoder, snapshot.getInvalid());
+ encodeInProgress(encoder, snapshot.getInProgress());
+ encodeChangeSets(encoder, snapshot.getCommittingChangeSets());
+ encodeChangeSets(encoder, snapshot.getCommittedChangeSets());
+
+ } catch (IOException e) {
+ LOG.error("Unable to serialize transaction state: ", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public TransactionSnapshot decode(InputStream in) {
+ BinaryDecoder decoder = new BinaryDecoder(in);
+
+ try {
+ TransactionVisibilityState minTxSnapshot = decodeTransactionVisibilityState(in);
+ NavigableMap<Long, Set<ChangeId>> committing = decodeChangeSets(decoder);
+ NavigableMap<Long, Set<ChangeId>> committed = decodeChangeSets(decoder);
+ return new TransactionSnapshot(minTxSnapshot.getTimestamp(), minTxSnapshot.getReadPointer(),
+ minTxSnapshot.getWritePointer(), minTxSnapshot.getInvalid(),
+ minTxSnapshot.getInProgress(), committing, committed);
+ } catch (IOException e) {
+ LOG.error("Unable to deserialize transaction state: ", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
+ BinaryDecoder decoder = new BinaryDecoder(in);
+ try {
+ long timestamp = decoder.readLong();
+ long readPointer = decoder.readLong();
+ long writePointer = decoder.readLong();
+ Collection<Long> invalid = decodeInvalid(decoder);
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress = decodeInProgress(decoder);
+ return new TransactionSnapshot(timestamp, readPointer, writePointer, invalid, inProgress);
+ } catch (IOException e) {
+ LOG.error("Unable to deserialize transaction state: ", e);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private void encodeInvalid(BinaryEncoder encoder, Collection<Long> invalid) throws IOException {
+ if (!invalid.isEmpty()) {
+ encoder.writeInt(invalid.size());
+ for (long invalidTx : invalid) {
+ encoder.writeLong(invalidTx);
+ }
+ }
+ encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+ }
+
+ private Collection<Long> decodeInvalid(BinaryDecoder decoder) throws IOException {
+ int size = decoder.readInt();
+ Collection<Long> invalid = Lists.newArrayListWithCapacity(size);
+ while (size != 0) { // zero denotes end of list as per AVRO spec
+ for (int remaining = size; remaining > 0; --remaining) {
+ invalid.add(decoder.readLong());
+ }
+ size = decoder.readInt();
+ }
+ return invalid;
+ }
+
+ protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
+ throws IOException {
+
+ if (!inProgress.isEmpty()) {
+ encoder.writeInt(inProgress.size());
+ for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+ encoder.writeLong(entry.getKey()); // tx id
+ encoder.writeLong(entry.getValue().getExpiration());
+ encoder.writeLong(entry.getValue().getVisibilityUpperBound());
+ }
+ }
+ encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+ }
+
+ protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
+ throws IOException {
+
+ int size = decoder.readInt();
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+ while (size != 0) { // zero denotes end of list as per AVRO spec
+ for (int remaining = size; remaining > 0; --remaining) {
+ long txId = decoder.readLong();
+ long expiration = decoder.readLong();
+ long visibilityUpperBound = decoder.readLong();
+ inProgress.put(txId,
+ new TransactionManager.InProgressTx(visibilityUpperBound, expiration));
+ }
+ size = decoder.readInt();
+ }
+ return inProgress;
+ }
+
+ private void encodeChangeSets(BinaryEncoder encoder, Map<Long, Set<ChangeId>> changes) throws IOException {
+ if (!changes.isEmpty()) {
+ encoder.writeInt(changes.size());
+ for (Map.Entry<Long, Set<ChangeId>> entry : changes.entrySet()) {
+ encoder.writeLong(entry.getKey());
+ encodeChanges(encoder, entry.getValue());
+ }
+ }
+ encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+ }
+
+ private NavigableMap<Long, Set<ChangeId>> decodeChangeSets(BinaryDecoder decoder) throws IOException {
+ int size = decoder.readInt();
+ NavigableMap<Long, Set<ChangeId>> changeSets = new TreeMap<Long, Set<ChangeId>>();
+ while (size != 0) { // zero denotes end of list as per AVRO spec
+ for (int remaining = size; remaining > 0; --remaining) {
+ changeSets.put(decoder.readLong(), decodeChanges(decoder));
+ }
+ size = decoder.readInt();
+ }
+ return changeSets;
+ }
+
+ private void encodeChanges(BinaryEncoder encoder, Set<ChangeId> changes) throws IOException {
+ if (!changes.isEmpty()) {
+ encoder.writeInt(changes.size());
+ for (ChangeId change : changes) {
+ encoder.writeBytes(change.getKey());
+ }
+ }
+ encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+ }
+
+ private Set<ChangeId> decodeChanges(BinaryDecoder decoder) throws IOException {
+ int size = decoder.readInt();
+ HashSet<ChangeId> changes = Sets.newHashSetWithExpectedSize(size);
+ while (size != 0) { // zero denotes end of list as per AVRO spec
+ for (int remaining = size; remaining > 0; --remaining) {
+ changes.add(new ChangeId(decoder.readBytes()));
+ }
+ size = decoder.readInt();
+ }
+ // todo is there an immutable hash set?
+ return changes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java
new file mode 100644
index 0000000..e2c5c16
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Interface to decode and encode a transaction snapshot. Each codec implements one version of the encoding.
+ * It need not include the version when encoding the snapshot.
+ */
+public interface SnapshotCodec {
+
+ /**
+ * @return the version of the encoding implemented by the codec.
+ */
+ int getVersion();
+
+ /**
+ * Encode a transaction snapshot into an output stream.
+ * @param out the output stream to write to
+ * @param snapshot the snapshot to encode
+ */
+ void encode(OutputStream out, TransactionSnapshot snapshot);
+
+ /**
+ * Decode a transaction snapshot from an input stream.
+ * @param in the input stream to read from
+ * @return the decoded snapshot
+ */
+ TransactionSnapshot decode(InputStream in);
+
+ /**
+ * Decode transaction visibility state from an input stream.
+ * @param in the input stream to read from
+ * @return {@link TransactionVisibilityState}
+ */
+ TransactionVisibilityState decodeTransactionVisibilityState(InputStream in);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java
new file mode 100644
index 0000000..3756846
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import javax.annotation.Nonnull;
+
+/**
+ * Maintains the codecs for all known versions of the transaction snapshot encoding.
+ */
+public class SnapshotCodecProvider implements SnapshotCodec {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotCodecProvider.class);
+
+ private final SortedMap<Integer, SnapshotCodec> codecs = Maps.newTreeMap();
+
+ @Inject
+ public SnapshotCodecProvider(Configuration configuration) {
+ initialize(configuration);
+ }
+
+ /**
+ * Register all codec specified in the configuration with this provider.
+ * There can only be one codec for a given version.
+ */
+ private void initialize(Configuration configuration) {
+ String[] codecClassNames = configuration.getTrimmedStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+ List<Class> codecClasses = Lists.newArrayList();
+ if (codecClassNames != null) {
+ for (String clsName : codecClassNames) {
+ try {
+ codecClasses.add(Class.forName(clsName));
+ } catch (ClassNotFoundException cnfe) {
+ LOG.warn("Unable to load class configured for " + TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES
+ + ": " + clsName, cnfe);
+ }
+ }
+ }
+
+ if (codecClasses.size() == 0) {
+ codecClasses.addAll(Arrays.asList(TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES));
+ }
+ for (Class<?> codecClass : codecClasses) {
+ try {
+ SnapshotCodec codec = (SnapshotCodec) (codecClass.newInstance());
+ codecs.put(codec.getVersion(), codec);
+ LOG.debug("Using snapshot codec {} for snapshots of version {}", codecClass.getName(), codec.getVersion());
+ } catch (Exception e) {
+ LOG.warn("Error instantiating snapshot codec {}. Skipping.", codecClass.getName(), e);
+ }
+ }
+ }
+
+ /**
+ * Retrieve the codec for a particular version of the encoding.
+ * @param version the version of interest
+ * @return the corresponding codec
+ * @throws java.lang.IllegalArgumentException if the version is not known
+ */
+ @Nonnull
+ @VisibleForTesting
+ SnapshotCodec getCodecForVersion(int version) {
+ SnapshotCodec codec = codecs.get(version);
+ if (codec == null) {
+ throw new IllegalArgumentException(String.format("Version %d of snapshot encoding is not supported", version));
+ }
+ return codec;
+ }
+
+ /**
+ * Retrieve the current snapshot codec, that is, the codec with the highest known version.
+ * @return the current codec
+ * @throws java.lang.IllegalStateException if no codecs are registered
+ */
+ private SnapshotCodec getCurrentCodec() {
+ if (codecs.isEmpty()) {
+ throw new IllegalStateException(String.format("No codecs are registered."));
+ }
+ return codecs.get(codecs.lastKey());
+ }
+
+ // Return the appropriate codec for the version in InputStream
+ private SnapshotCodec getCodec(InputStream in) {
+ BinaryDecoder decoder = new BinaryDecoder(in);
+ int persistedVersion;
+ try {
+ persistedVersion = decoder.readInt();
+ } catch (IOException e) {
+ LOG.error("Unable to read transaction state version: ", e);
+ throw Throwables.propagate(e);
+ }
+ return getCodecForVersion(persistedVersion);
+ }
+
+ @Override
+ public int getVersion() {
+ return getCurrentCodec().getVersion();
+ }
+
+ @Override
+ public TransactionSnapshot decode(InputStream in) {
+ return getCodec(in).decode(in);
+ }
+
+ @Override
+ public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
+ return getCodec(in).decodeTransactionVisibilityState(in);
+ }
+
+ @Override
+ public void encode(OutputStream out, TransactionSnapshot snapshot) {
+ SnapshotCodec codec = getCurrentCodec();
+ try {
+ new BinaryEncoder(out).writeInt(codec.getVersion());
+ } catch (IOException e) {
+ LOG.error("Unable to write transaction state version: ", e);
+ throw Throwables.propagate(e);
+ }
+ codec.encode(out, snapshot);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
new file mode 100644
index 0000000..ccf026d
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionSnapshot;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ */
+public class SnapshotCodecV2 extends DefaultSnapshotCodec {
+ @Override
+ public int getVersion() {
+ return 2;
+ }
+
+ @Override
+ protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
+ throws IOException {
+
+ if (!inProgress.isEmpty()) {
+ encoder.writeInt(inProgress.size());
+ for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+ encoder.writeLong(entry.getKey()); // tx id
+ encoder.writeLong(entry.getValue().getExpiration());
+ encoder.writeLong(entry.getValue().getVisibilityUpperBound());
+ encoder.writeInt(entry.getValue().getType().ordinal());
+ }
+ }
+ encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+ }
+
+ @Override
+ protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
+ throws IOException {
+
+ int size = decoder.readInt();
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+ while (size != 0) { // zero denotes end of list as per AVRO spec
+ for (int remaining = size; remaining > 0; --remaining) {
+ long txId = decoder.readLong();
+ long expiration = decoder.readLong();
+ long visibilityUpperBound = decoder.readLong();
+ int txTypeIdx = decoder.readInt();
+ TransactionType txType;
+ try {
+ txType = TransactionType.values()[txTypeIdx];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
+ }
+ inProgress.put(txId,
+ new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType,
+ new LongArrayList()));
+ }
+ size = decoder.readInt();
+ }
+ return inProgress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java
new file mode 100644
index 0000000..1b9e2b3
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import org.apache.tephra.persist.TransactionSnapshot;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ *
+ * <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2},
+ * but a new version number is used to allow easy migration from projects using deprecated codecs with
+ * conflicting version numbers.</p>
+ */
+public class SnapshotCodecV3 extends SnapshotCodecV2 {
+ @Override
+ public int getVersion() {
+ return 3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
new file mode 100644
index 0000000..cadaa8e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionSnapshot;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ *
+ */
+public class SnapshotCodecV4 extends SnapshotCodecV2 {
+ @Override
+ public int getVersion() {
+ return 4;
+ }
+
+ @Override
+ protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
+ throws IOException {
+
+ if (!inProgress.isEmpty()) {
+ encoder.writeInt(inProgress.size());
+ for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+ encoder.writeLong(entry.getKey()); // tx id
+ encoder.writeLong(entry.getValue().getExpiration());
+ encoder.writeLong(entry.getValue().getVisibilityUpperBound());
+ encoder.writeInt(entry.getValue().getType().ordinal());
+ // write checkpoint tx IDs
+ LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers();
+ if (checkpointPointers != null && !checkpointPointers.isEmpty()) {
+ encoder.writeInt(checkpointPointers.size());
+ for (int i = 0; i < checkpointPointers.size(); i++) {
+ encoder.writeLong(checkpointPointers.getLong(i));
+ }
+ }
+ encoder.writeInt(0);
+ }
+ }
+ encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+ }
+
+ @Override
+ protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
+ throws IOException {
+
+ int size = decoder.readInt();
+ NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+ while (size != 0) { // zero denotes end of list as per AVRO spec
+ for (int remaining = size; remaining > 0; --remaining) {
+ long txId = decoder.readLong();
+ long expiration = decoder.readLong();
+ long visibilityUpperBound = decoder.readLong();
+ int txTypeIdx = decoder.readInt();
+ TransactionType txType;
+ try {
+ txType = TransactionType.values()[txTypeIdx];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
+ }
+ // read checkpoint tx IDs
+ int checkpointPointerSize = decoder.readInt();
+ LongArrayList checkpointPointers = new LongArrayList(checkpointPointerSize);
+ while (checkpointPointerSize != 0) {
+ for (int checkpointRemaining = checkpointPointerSize; checkpointRemaining > 0; --checkpointRemaining) {
+ checkpointPointers.add(decoder.readLong());
+ }
+ checkpointPointerSize = decoder.readInt();
+ }
+ inProgress.put(txId,
+ new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, checkpointPointers));
+ }
+ size = decoder.readInt();
+ }
+ return inProgress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java
new file mode 100644
index 0000000..cf1a276
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains interfaces and implementations for encoding and decoding transaction snapshots.
+ */
+package org.apache.tephra.snapshot;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
new file mode 100644
index 0000000..a13668c
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides {@code org.apache.hadoop.conf.Configuration} instances, constructed by the correct version used
+ * for the runtime.
+ */
+public class ConfigurationFactory implements Provider<Configuration> {
+ private static class ConfigurationProviderFactory extends HBaseVersionSpecificFactory<ConfigurationProvider> {
+ @Override
+ protected String getHBase96Classname() {
+ return "org.apache.tephra.hbase96.HBase96ConfigurationProvider";
+ }
+
+ @Override
+ protected String getHBase98Classname() {
+ return "org.apache.tephra.hbase98.HBase98ConfigurationProvider";
+ }
+
+ @Override
+ protected String getHBase10Classname() {
+ return "org.apache.tephra.hbase10.HBase10ConfigurationProvider";
+ }
+
+ @Override
+ protected String getHBase11Classname() {
+ return "org.apache.tephra.hbase11.HBase11ConfigurationProvider";
+ }
+
+ @Override
+ protected String getHBase10CDHClassname() {
+ return "org.apache.tephra.hbase10cdh.HBase10ConfigurationProvider";
+ }
+ }
+
+ private final ConfigurationProvider provider = new ConfigurationProviderFactory().get();
+
+ /**
+ * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
+ */
+ @Override
+ public Configuration get() {
+ return provider.get();
+ }
+
+ /**
+ * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
+ *
+ * @param baseConf additional configuration properties to merge on to the classpath configuration
+ * @return the merged configuration
+ */
+ public Configuration get(Configuration baseConf) {
+ return provider.get(baseConf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java
new file mode 100644
index 0000000..e133c69
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides {@code Configuration} instances, constructed by the HBase version on which we are running.
+ */
+public abstract class ConfigurationProvider implements Provider<Configuration> {
+ @Override
+ public abstract Configuration get();
+
+ public abstract Configuration get(Configuration baseConf);
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
new file mode 100644
index 0000000..687e46d
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Detects the currently loaded HBase version. It is assumed that only one HBase version is loaded at a time,
+ * since using more than one HBase version within the same process will require classloader isolation anyway.
+ */
+public class HBaseVersion {
+ private static final String HBASE_94_VERSION = "0.94";
+ private static final String HBASE_96_VERSION = "0.96";
+ private static final String HBASE_98_VERSION = "0.98";
+ private static final String HBASE_10_VERSION = "1.0";
+ private static final String HBASE_11_VERSION = "1.1";
+ private static final String HBASE_12_VERSION = "1.2";
+ private static final String CDH_CLASSIFIER = "cdh";
+
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class);
+
+ /**
+ * Represents the major version of the HBase library that is currently loaded.
+ */
+ public enum Version {
+ HBASE_94("0.94"),
+ HBASE_96("0.96"),
+ HBASE_98("0.98"),
+ HBASE_10("1.0"),
+ HBASE_10_CDH("1.0-cdh"),
+ HBASE_11("1.1"),
+ HBASE_12_CDH("1.2-cdh"),
+ UNKNOWN("unknown");
+
+ final String majorVersion;
+
+ Version(String majorVersion) {
+ this.majorVersion = majorVersion;
+ }
+
+ public String getMajorVersion() {
+ return majorVersion;
+ }
+ }
+
+ private static Version currentVersion;
+ private static String versionString;
+ static {
+ try {
+ Class versionInfoClass = Class.forName("org.apache.hadoop.hbase.util.VersionInfo");
+ Method versionMethod = versionInfoClass.getMethod("getVersion");
+ versionString = (String) versionMethod.invoke(null);
+ if (versionString.startsWith(HBASE_94_VERSION)) {
+ currentVersion = Version.HBASE_94;
+ } else if (versionString.startsWith(HBASE_96_VERSION)) {
+ currentVersion = Version.HBASE_96;
+ } else if (versionString.startsWith(HBASE_98_VERSION)) {
+ currentVersion = Version.HBASE_98;
+ } else if (versionString.startsWith(HBASE_10_VERSION)) {
+ VersionNumber ver = VersionNumber.create(versionString);
+ if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
+ currentVersion = Version.HBASE_10_CDH;
+ } else {
+ currentVersion = Version.HBASE_10;
+ }
+ } else if (versionString.startsWith(HBASE_11_VERSION)) {
+ currentVersion = Version.HBASE_11;
+ } else if (versionString.startsWith(HBASE_12_VERSION)) {
+ VersionNumber ver = VersionNumber.create(versionString);
+ if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
+ currentVersion = Version.HBASE_12_CDH;
+ } else {
+ // CDH 5.7 comes with HBase version 1.2.0-CDH5.7.0. However currently there is no
+ // other hadoop distribution that uses HBase 1.2, so the version is set here to UNKNOWN.
+ currentVersion = Version.UNKNOWN;
+ }
+ } else {
+ currentVersion = Version.UNKNOWN;
+ }
+ } catch (Throwable e) {
+ // must be a class loading exception, HBase is not there
+ LOG.error("Unable to determine HBase version from string '{}', are HBase classes available?", versionString);
+ LOG.error("Exception was: ", e);
+ currentVersion = Version.UNKNOWN;
+ }
+ }
+
+ /**
+ * Returns the major version of the currently loaded HBase library.
+ */
+ public static Version get() {
+ return currentVersion;
+ }
+
+ /**
+ * Returns the full version string for the currently loaded HBase library.
+ */
+ public static String getVersionString() {
+ return versionString;
+ }
+
+ /**
+ * Prints out the HBase {@link Version} enum value for the current version of HBase on the classpath.
+ */
+ public static void main(String[] args) {
+ boolean verbose = args.length == 1 && "-v".equals(args[0]);
+ Version version = HBaseVersion.get();
+ System.out.println(version.getMajorVersion());
+ if (verbose) {
+ System.out.println("versionString=" + getVersionString());
+ }
+ }
+
+ /**
+ * Utility class to parse apart version number components. The version string provided is expected to be in
+ * the format: major[.minor[.patch[.last]][-classifier][-SNAPSHOT]
+ *
+ * <p>Only the major version number is actually required.</p>
+ */
+ public static class VersionNumber {
+ private static final Pattern PATTERN =
+ Pattern.compile("(\\d+)(\\.(\\d+))?(\\.(\\d+))?(\\.(\\d+))?(\\-(?!SNAPSHOT)([^\\-]+))?(\\-SNAPSHOT)?");
+
+ private Integer major;
+ private Integer minor;
+ private Integer patch;
+ private Integer last;
+ private String classifier;
+ private boolean snapshot;
+
+ private VersionNumber(Integer major, Integer minor, Integer patch, Integer last,
+ String classifier, boolean snapshot) {
+ this.major = major;
+ this.minor = minor;
+ this.patch = patch;
+ this.last = last;
+ this.classifier = classifier;
+ this.snapshot = snapshot;
+ }
+
+ public Integer getMajor() {
+ return major;
+ }
+
+ public Integer getMinor() {
+ return minor;
+ }
+
+ public Integer getPatch() {
+ return patch;
+ }
+
+ public Integer getLast() {
+ return last;
+ }
+
+ public String getClassifier() {
+ return classifier;
+ }
+
+ public boolean isSnapshot() {
+ return snapshot;
+ }
+
+ public static VersionNumber create(String versionString) throws ParseException {
+ Matcher matcher = PATTERN.matcher(versionString);
+ if (matcher.matches()) {
+ String majorString = matcher.group(1);
+ String minorString = matcher.group(3);
+ String patchString = matcher.group(5);
+ String last = matcher.group(7);
+ String classifier = matcher.group(9);
+ String snapshotString = matcher.group(10);
+ return new VersionNumber(new Integer(majorString),
+ minorString != null ? new Integer(minorString) : null,
+ patchString != null ? new Integer(patchString) : null,
+ last != null ? new Integer(last) : null,
+ classifier,
+ "-SNAPSHOT".equals(snapshotString));
+ }
+ throw new ParseException(
+ "Input string did not match expected pattern: major[.minor[.patch]][-classifier][-SNAPSHOT]", 0);
+ }
+ }
+}