You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:42 UTC
[42/56] [abbrv] [partial] incubator-tephra git commit: Rename package
to org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java b/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java
deleted file mode 100644
index 432ab2f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.rpc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadedSelectorServerWithFix;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.utils.Networks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @param <T> The type of service handler interface.
- * @param <I> The type of the thrift service.
- */
-public final class ThriftRPCServer<T extends RPCServiceHandler, I> extends AbstractExecutionThreadService {
-
- private static final Logger LOG = LoggerFactory.getLogger(ThriftRPCServer.class);
-
- private final String name;
- private final int ioThreads;
- private final int workerThreads;
- private final int maxReadBufferBytes;
- private final T serviceHandler;
- private final TProcessor processor;
-
- private InetSocketAddress bindAddress;
- private ExecutorService executor;
- private TServer server;
-
- /**
- * Creates a {@link Builder} for creating instance of {@link ThriftRPCServer}.
- * @param serviceType Class of the thrift service.
- * @param <I> Type of the thrift service.
- * @return A {@link Builder}.
- */
- public static <I> Builder<I> builder(Class<I> serviceType) {
- return new Builder<I>(serviceType);
- }
-
- /**
- * Builder for creating instance of ThriftRPCServer. By default, the instance created will bind to
- * random port and with 2 io threads and worker threads equals to min(2, number of cpu cores - 2).
- */
- public static final class Builder<I> {
- private final Class<I> serviceType;
- private String name;
- private InetSocketAddress bindAddress = new InetSocketAddress(0);
- private int ioThreads = 2;
- private int workerThreads = Runtime.getRuntime().availableProcessors() - 2;
- // 16Mb
- private int maxReadBufferBytes = 16 * 1024 * 1024;
-
- private Builder(Class<I> serviceType) {
- this.serviceType = serviceType;
- this.name = serviceType.getSimpleName();
- }
-
- public Builder<I> setName(String name) {
- this.name = name;
- return this;
- }
-
- public Builder<I> setHost(String host) {
- this.bindAddress = new InetSocketAddress(host, bindAddress.getPort());
- return this;
- }
-
- public Builder<I> setPort(int port) {
- this.bindAddress = new InetSocketAddress(bindAddress.getHostName(), port);
- return this;
- }
-
- public Builder<I> setIOThreads(int count) {
- this.ioThreads = count;
- return this;
- }
-
- public Builder<I> setWorkerThreads(int count) {
- this.workerThreads = count;
- return this;
- }
-
- public Builder<I> setMaxReadBufferBytes(int maxReadBufferBytes) {
- this.maxReadBufferBytes = maxReadBufferBytes;
- return this;
- }
-
- public <T extends RPCServiceHandler> ThriftRPCServer<T, I> build(T serviceHandler) {
- return new ThriftRPCServer<T, I>(bindAddress, ioThreads, workerThreads, maxReadBufferBytes,
- serviceHandler, serviceType, name);
- }
- }
-
- /**
- * Creates a ThriftRPCServer with the given paramters.
- *
- * @param bindAddress The socket address for the server to listen on. If {@code null}, it'll be binded to random
- * port on localhost.
- * @param ioThreads Number of io threads.
- * @param workerThreads Number of worker threads.
- * @param serviceHandler Handler for handling client requests.
- */
- @SuppressWarnings("unchecked")
- private ThriftRPCServer(InetSocketAddress bindAddress, int ioThreads,
- int workerThreads, int maxReadBufferBytes,
- T serviceHandler, Class<I> serviceType, String name) {
- Preconditions.checkArgument(ioThreads > 0, "IO threads must be > 0.");
- Preconditions.checkArgument(workerThreads > 0, "Worker threads must be > 0.");
-
- this.bindAddress = bindAddress;
- this.ioThreads = ioThreads;
- this.workerThreads = workerThreads;
- this.maxReadBufferBytes = maxReadBufferBytes;
- this.serviceHandler = serviceHandler;
- this.name = name;
- this.processor = createProcessor((Class<T>) serviceHandler.getClass(), serviceType);
- }
-
- public InetSocketAddress getBindAddress() {
- return bindAddress;
- }
-
- @Override
- protected void startUp() throws Exception {
- // Determines the address and port to listen on
- InetSocketAddress listenOn = bindAddress;
- if (listenOn == null || listenOn.getPort() <= 0) {
- int port = Networks.getRandomPort();
- if (listenOn == null) {
- listenOn = new InetSocketAddress("localhost", port);
- } else {
- listenOn = new InetSocketAddress(listenOn.getAddress(), port);
- }
- }
- bindAddress = listenOn;
-
- executor = new ThreadPoolExecutor(0, workerThreads,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- Threads.createDaemonThreadFactory(String.format("%s-rpc-%%d", name)),
- new ThreadPoolExecutor.CallerRunsPolicy());
- serviceHandler.init();
-
- TThreadedSelectorServerWithFix.Args args =
- new TThreadedSelectorServerWithFix.Args(new TNonblockingServerSocket(listenOn))
- .selectorThreads(ioThreads)
- .protocolFactory(new TBinaryProtocol.Factory())
- .transportFactory(new TFramedTransport.Factory())
- .processor(processor)
- .executorService(executor);
-
- // ENG-443 - Set the max read buffer size. This is important as this will
- // prevent the server from throwing OOME if telnetd to the port
- // it's running on.
- args.maxReadBufferBytes = maxReadBufferBytes;
- server = new TThreadedSelectorServerWithFix(args);
- LOG.info("Starting RPC server for {}", name);
- }
-
- @Override
- protected void shutDown() throws Exception {
- serviceHandler.destroy();
- executor.shutdownNow();
- LOG.info("RPC server for {} stopped.", name);
- }
-
- @Override
- protected void triggerShutdown() {
- LOG.info("Request to stop RPC server for {}", name);
- server.stop();
- }
-
- @Override
- protected void run() throws Exception {
- LOG.info("Running RPC server for {}", name);
- server.serve();
- LOG.info("Done running RPC server for {}", name);
- }
-
- @SuppressWarnings("unchecked")
- private TProcessor createProcessor(final Class<T> handlerType, Class<I> serviceType) {
- // Pick the Iface inner interface and the Processor class
- Class<? extends TProcessor> processorType = null;
- Class<?> ifaceType = null;
- for (Class<?> clz : serviceType.getDeclaredClasses()) {
- if (TProcessor.class.isAssignableFrom(clz)) {
- processorType = (Class<? extends TProcessor>) clz;
- } else if (clz.isInterface() && "Iface".equals(clz.getSimpleName())) {
- ifaceType = clz;
- }
- }
-
- Preconditions.checkArgument(processorType != null,
- "Missing TProcessor, %s is not a valid thrift service.", serviceType.getName());
- Preconditions.checkArgument(ifaceType != null,
- "Missing Iface, %s is not a valid thrift service.", serviceType.getName());
-
- // If handler already implements the Iface, simply delegate
- if (ifaceType.isAssignableFrom(handlerType)) {
- return createProxyProcessor(handlerType, processorType, ifaceType);
- }
-
- throw new IllegalArgumentException("Unsupported handler type.");
- }
-
- private TProcessor createProxyProcessor(final Class<T> handlerType,
- Class<? extends TProcessor> processorType, Class<?> ifaceType) {
-
- try {
- // Map from Iface method to handlerType method to save reflection lookup
- ImmutableMap.Builder<Method, Method> builder = ImmutableMap.builder();
- for (Method method : ifaceType.getMethods()) {
- Method handlerMethod = handlerType.getMethod(method.getName(), method.getParameterTypes());
- if (!handlerMethod.isAccessible()) {
- handlerMethod.setAccessible(true);
- }
- builder.put(method, handlerMethod);
- }
- final Map<Method, Method> methods = builder.build();
-
- Object proxy = Proxy.newProxyInstance(ifaceType.getClassLoader(),
- new Class[]{ifaceType}, new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- try {
- return methods.get(method).invoke(serviceHandler, args);
- } catch (InvocationTargetException e) {
- if (e.getCause() != null) {
- throw e.getCause();
- } else {
- throw e;
- }
- }
- }
- });
-
- return processorType.getConstructor(ifaceType).newInstance(proxy);
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java b/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java
deleted file mode 100644
index 2b5bccd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package contains class for writing RPC server and client in simple manner.
- */
-package co.cask.tephra.rpc;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java
deleted file mode 100644
index 92df8cd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import com.google.inject.AbstractModule;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Provides Guice bindings for {@link Configuration}.
- */
-public final class ConfigModule extends AbstractModule {
-
- private final Configuration configuration;
-
- public ConfigModule(Configuration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- protected void configure() {
- bind(Configuration.class).toInstance(configuration);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java b/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java
deleted file mode 100644
index 76cbbdf..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.apache.twill.discovery.InMemoryDiscoveryService;
-import org.apache.twill.discovery.ServiceDiscovered;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.zookeeper.ZKClientService;
-
-/**
- * Provides access to Google Guice modules for in-memory, single-node, and distributed operation for
- * {@link DiscoveryService} and {@link DiscoveryServiceClient}.
- */
-public final class DiscoveryModules {
-
- public Module getInMemoryModules() {
- return new InMemoryDiscoveryModule();
- }
-
- public Module getSingleNodeModules() {
- return new InMemoryDiscoveryModule();
- }
-
- public Module getDistributedModules() {
- return new ZKDiscoveryModule();
- }
-
- private static final class InMemoryDiscoveryModule extends AbstractModule {
-
- // ensuring to be singleton across JVM
- private static final InMemoryDiscoveryService IN_MEMORY_DISCOVERY_SERVICE = new InMemoryDiscoveryService();
-
- @Override
- protected void configure() {
- InMemoryDiscoveryService discovery = IN_MEMORY_DISCOVERY_SERVICE;
- bind(DiscoveryService.class).toInstance(discovery);
- bind(DiscoveryServiceClient.class).toInstance(discovery);
- }
- }
-
- private static final class ZKDiscoveryModule extends PrivateModule {
-
- @Override
- protected void configure() {
- expose(DiscoveryService.class);
- expose(DiscoveryServiceClient.class);
- }
-
- @Provides
- @Singleton
- private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) {
- return new ZKDiscoveryService(zkClient);
- }
-
- @Provides
- @Singleton
- private DiscoveryService providesDiscoveryService(final ZKClientService zkClient,
- final ZKDiscoveryService delegate) {
- return new DiscoveryService() {
- @Override
- public Cancellable register(Discoverable discoverable) {
- if (!zkClient.isRunning()) {
- zkClient.startAndWait();
- }
- return delegate.register(discoverable);
- }
- };
- }
-
- @Provides
- @Singleton
- private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient,
- final ZKDiscoveryService delegate) {
- return new DiscoveryServiceClient() {
- @Override
- public ServiceDiscovered discover(String s) {
- if (!zkClient.isRunning()) {
- zkClient.startAndWait();
- }
- return delegate.discover(s);
- }
- };
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java
deleted file mode 100644
index c6df2b3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.ThreadLocalClientProvider;
-import co.cask.tephra.distributed.ThriftClientProvider;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-
-/**
- * Provides Guice binding for {@link co.cask.tephra.distributed.ThriftClientProvider}.
- */
-public class TransactionClientModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(ThriftClientProvider.class).toProvider(ThriftClientProviderSupplier.class);
- }
-
- /**
- * Provides implementation of {@link co.cask.tephra.distributed.ThriftClientProvider}
- * based on configuration.
- */
- @Singleton
- private static final class ThriftClientProviderSupplier implements Provider<ThriftClientProvider> {
-
- private final Configuration cConf;
- private DiscoveryServiceClient discoveryServiceClient;
-
- @Inject
- ThriftClientProviderSupplier(Configuration cConf) {
- this.cConf = cConf;
- }
-
- @Inject(optional = true)
- void setDiscoveryServiceClient(DiscoveryServiceClient discoveryServiceClient) {
- this.discoveryServiceClient = discoveryServiceClient;
- }
-
- @Override
- public ThriftClientProvider get() {
- // configure the client provider
- String provider = cConf.get(TxConstants.Service.CFG_DATA_TX_CLIENT_PROVIDER,
- TxConstants.Service.DEFAULT_DATA_TX_CLIENT_PROVIDER);
- ThriftClientProvider clientProvider;
- if ("pool".equals(provider)) {
- clientProvider = new PooledClientProvider(cConf, discoveryServiceClient);
- } else if ("thread-local".equals(provider)) {
- clientProvider = new ThreadLocalClientProvider(cConf, discoveryServiceClient);
- } else {
- String message = "Unknown Transaction Service Client Provider '" + provider + "'.";
- throw new IllegalArgumentException(message);
- }
- return clientProvider;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
deleted file mode 100644
index c88f742..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.DefaultTransactionExecutor;
-import co.cask.tephra.TransactionExecutor;
-import co.cask.tephra.TransactionExecutorFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.metrics.DefaultMetricsCollector;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.persist.HDFSTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-/**
- * Guice bindings for running in distributed mode on a cluster.
- */
-final class TransactionDistributedModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(SnapshotCodecProvider.class).in(Singleton.class);
- bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
- .to(HDFSTransactionStateStorage.class).in(Singleton.class);
- bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
-
- bind(TransactionManager.class).in(Singleton.class);
- bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
- bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
-
- install(new FactoryModuleBuilder()
- .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
- .build(TransactionExecutorFactory.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java
deleted file mode 100644
index d9780e1..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.DefaultTransactionExecutor;
-import co.cask.tephra.TransactionExecutor;
-import co.cask.tephra.TransactionExecutorFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.NoOpTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-
-/**
- * Guice bindings for running completely in-memory (no persistence). This should only be used for
- * test classes, as the transaction state cannot be recovered in the case of a failure.
- */
-public class TransactionInMemoryModule extends AbstractModule {
- public TransactionInMemoryModule() {
- }
-
- @Override
- protected void configure() {
- bind(SnapshotCodecProvider.class).in(Singleton.class);
- bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class);
- bind(TransactionManager.class).in(Singleton.class);
- bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
- // no metrics output for in-memory
- bind(MetricsCollector.class).to(TxMetricsCollector.class);
-
- install(new FactoryModuleBuilder()
- .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
- .build(TransactionExecutorFactory.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java
deleted file mode 100644
index 720c84c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.DefaultTransactionExecutor;
-import co.cask.tephra.TransactionExecutor;
-import co.cask.tephra.TransactionExecutorFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.DefaultMetricsCollector;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.persist.LocalFileTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-/**
- * Guice bindings for running in single-node mode (persistence to local disk and in-memory client).
- */
-final class TransactionLocalModule extends AbstractModule {
-
- @Override
- protected void configure() {
- bind(SnapshotCodecProvider.class).in(Singleton.class);
- bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
- .to(LocalFileTransactionStateStorage.class).in(Singleton.class);
- bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
-
- bind(TransactionManager.class).in(Singleton.class);
- bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
- bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
-
- install(new FactoryModuleBuilder()
- .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
- .build(TransactionExecutorFactory.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java
deleted file mode 100644
index 551fb3a..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import com.google.inject.Module;
-
-/**
- * Provides access to Google Guice modules for in-memory, single-node, and distributed operation.
- */
-public class TransactionModules {
- public TransactionModules() {
- }
-
- public Module getInMemoryModules() {
- return new TransactionInMemoryModule();
- }
-
- public Module getSingleNodeModules() {
- return new TransactionLocalModule();
- }
-
- public Module getDistributedModules() {
- return new TransactionDistributedModule();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java
deleted file mode 100644
index 777354c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.NoOpTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import com.google.inject.name.Names;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A provider for {@link TransactionStateStorage} that provides different
- * {@link TransactionStateStorage} implementation based on configuration.
- */
-@Singleton
-public final class TransactionStateStorageProvider implements Provider<TransactionStateStorage> {
-
- private final Configuration cConf;
- private final Injector injector;
-
- @Inject
- TransactionStateStorageProvider(Configuration cConf, Injector injector) {
- this.cConf = cConf;
- this.injector = injector;
- }
-
- @Override
- public TransactionStateStorage get() {
- if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) {
- return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist")));
- } else {
- return injector.getInstance(NoOpTransactionStateStorage.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
deleted file mode 100644
index ebc1df7..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.zookeeper.TephraZKClientService;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Provides Guice binding to {@link ZKClient} and {@link ZKClientService}.
- */
-public class ZKModule extends AbstractModule {
-
- @Override
- protected void configure() {
- /**
- * ZKClientService is provided by the provider method
- * {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}.
- */
- bind(ZKClient.class).to(ZKClientService.class);
- }
-
- @Provides
- @Singleton
- private ZKClientService provideZKClientService(Configuration conf) {
- String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
- if (zkStr == null) {
- // Default to HBase one.
- zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM);
- }
-
- int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT);
- ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null,
- ArrayListMultimap.<String, byte[]>create());
- return ZKClientServices.delegate(
- ZKClients.reWatchOnExpire(
- ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
- )
- )
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java
deleted file mode 100644
index 8203494..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An decoder to help read snapshots in binary format.
- */
-public final class BinaryDecoder {
-
- private final InputStream input;
-
- /**
- * @param input Stream to read from.
- */
- public BinaryDecoder(InputStream input) {
- this.input = input;
- }
-
- /**
- * Read one int from the input.
- * @return the read number
- * @throws java.io.IOException If there is IO error.
- * @throws java.io.EOFException If end of file reached.
- */
- public int readInt() throws IOException {
- int val = 0;
- int shift = 0;
- int b = readByte();
- while (b > 0x7f) {
- val ^= (b & 0x7f) << shift;
- shift += 7;
- b = readByte();
- }
- val ^= b << shift;
- return (val >>> 1) ^ -(val & 1);
- }
-
- /**
- * Read one long int from the input.
- * @return the read number
- * @throws java.io.IOException If there is IO error.
- * @throws java.io.EOFException If end of file reached.
- */
- public long readLong() throws IOException {
- long val = 0;
- int shift = 0;
- int b = readByte();
- while (b > 0x7f) {
- val ^= (long) (b & 0x7f) << shift;
- shift += 7;
- b = readByte();
- }
- val ^= (long) b << shift;
- return (val >>> 1) ^ -(val & 1);
- }
-
- /**
- * Read a byte sequence. First read an int to indicate how many bytes to read, then that many bytes.
- * @return the read bytes as a byte array
- * @throws java.io.IOException If there is IO error.
- * @throws java.io.EOFException If end of file reached.
- */
- public byte[] readBytes() throws IOException {
- int toRead = readInt();
- byte[] bytes = new byte[toRead];
- while (toRead > 0) {
- int byteRead = input.read(bytes, bytes.length - toRead, toRead);
- if (byteRead == -1) {
- throw new EOFException();
- }
- toRead -= byteRead;
- }
- return bytes;
- }
-
- /**
- * Reads a single byte value.
- *
- * @return The byte value read.
- * @throws java.io.IOException If there is IO error.
- * @throws java.io.EOFException If end of file reached.
- */
- private int readByte() throws IOException {
- int b = input.read();
- if (b == -1) {
- throw new EOFException();
- }
- return b;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java
deleted file mode 100644
index bc1bce0..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * An encoder to help encode snapshots in binary format.
- */
-public final class BinaryEncoder {
-
- private final OutputStream output;
-
- /**
- * @param output stream to write to
- */
- public BinaryEncoder(OutputStream output) {
- this.output = output;
- }
-
- /**
- * write a single int value.
- * @throws java.io.IOException If there is IO error.
- */
- public BinaryEncoder writeInt(int i) throws IOException {
- // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
- int val = (i << 1) ^ (i >> 31);
-
- if ((val & ~0x7f) != 0) {
- output.write(0x80 | val & 0x7f);
- val >>>= 7;
- while (val > 0x7f) {
- output.write(0x80 | val & 0x7f);
- val >>>= 7;
- }
- }
- output.write(val);
-
- return this;
- }
-
- /**
- * write a single long int value.
- * @throws java.io.IOException If there is IO error.
- */
- public BinaryEncoder writeLong(long l) throws IOException {
- // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
- long val = (l << 1) ^ (l >> 63);
-
- if ((val & ~0x7f) != 0) {
- output.write((int) (0x80 | val & 0x7f));
- val >>>= 7;
- while (val > 0x7f) {
- output.write((int) (0x80 | val & 0x7f));
- val >>>= 7;
- }
- }
- output.write((int) val);
-
- return this;
- }
-
- /**
- * write a sequence of bytes. First writes the number of bytes as an int, then the bytes themselves.
- * @throws java.io.IOException If there is IO error.
- */
- public BinaryEncoder writeBytes(byte[] bytes) throws IOException {
- writeLong(bytes.length);
- output.write(bytes, 0, bytes.length);
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java
deleted file mode 100644
index d09d79d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- * @deprecated This codec is now deprecated and is replaced by {@link SnapshotCodecV2}.
- */
-@Deprecated
-public class DefaultSnapshotCodec implements SnapshotCodec {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultSnapshotCodec.class);
-
- @Override
- public int getVersion() {
- return 1;
- }
-
- @Override
- public void encode(OutputStream out, TransactionSnapshot snapshot) {
- try {
- BinaryEncoder encoder = new BinaryEncoder(out);
-
- encoder.writeLong(snapshot.getTimestamp());
- encoder.writeLong(snapshot.getReadPointer());
- encoder.writeLong(snapshot.getWritePointer());
- encodeInvalid(encoder, snapshot.getInvalid());
- encodeInProgress(encoder, snapshot.getInProgress());
- encodeChangeSets(encoder, snapshot.getCommittingChangeSets());
- encodeChangeSets(encoder, snapshot.getCommittedChangeSets());
-
- } catch (IOException e) {
- LOG.error("Unable to serialize transaction state: ", e);
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public TransactionSnapshot decode(InputStream in) {
- BinaryDecoder decoder = new BinaryDecoder(in);
-
- try {
- TransactionVisibilityState minTxSnapshot = decodeTransactionVisibilityState(in);
- NavigableMap<Long, Set<ChangeId>> committing = decodeChangeSets(decoder);
- NavigableMap<Long, Set<ChangeId>> committed = decodeChangeSets(decoder);
- return new TransactionSnapshot(minTxSnapshot.getTimestamp(), minTxSnapshot.getReadPointer(),
- minTxSnapshot.getWritePointer(), minTxSnapshot.getInvalid(),
- minTxSnapshot.getInProgress(), committing, committed);
- } catch (IOException e) {
- LOG.error("Unable to deserialize transaction state: ", e);
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
- BinaryDecoder decoder = new BinaryDecoder(in);
- try {
- long timestamp = decoder.readLong();
- long readPointer = decoder.readLong();
- long writePointer = decoder.readLong();
- Collection<Long> invalid = decodeInvalid(decoder);
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress = decodeInProgress(decoder);
- return new TransactionSnapshot(timestamp, readPointer, writePointer, invalid, inProgress);
- } catch (IOException e) {
- LOG.error("Unable to deserialize transaction state: ", e);
- throw Throwables.propagate(e);
- }
- }
-
- private void encodeInvalid(BinaryEncoder encoder, Collection<Long> invalid) throws IOException {
- if (!invalid.isEmpty()) {
- encoder.writeInt(invalid.size());
- for (long invalidTx : invalid) {
- encoder.writeLong(invalidTx);
- }
- }
- encoder.writeInt(0); // zero denotes end of list as per AVRO spec
- }
-
- private Collection<Long> decodeInvalid(BinaryDecoder decoder) throws IOException {
- int size = decoder.readInt();
- Collection<Long> invalid = Lists.newArrayListWithCapacity(size);
- while (size != 0) { // zero denotes end of list as per AVRO spec
- for (int remaining = size; remaining > 0; --remaining) {
- invalid.add(decoder.readLong());
- }
- size = decoder.readInt();
- }
- return invalid;
- }
-
- protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
- throws IOException {
-
- if (!inProgress.isEmpty()) {
- encoder.writeInt(inProgress.size());
- for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
- encoder.writeLong(entry.getKey()); // tx id
- encoder.writeLong(entry.getValue().getExpiration());
- encoder.writeLong(entry.getValue().getVisibilityUpperBound());
- }
- }
- encoder.writeInt(0); // zero denotes end of list as per AVRO spec
- }
-
- protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
- throws IOException {
-
- int size = decoder.readInt();
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
- while (size != 0) { // zero denotes end of list as per AVRO spec
- for (int remaining = size; remaining > 0; --remaining) {
- long txId = decoder.readLong();
- long expiration = decoder.readLong();
- long visibilityUpperBound = decoder.readLong();
- inProgress.put(txId,
- new TransactionManager.InProgressTx(visibilityUpperBound, expiration));
- }
- size = decoder.readInt();
- }
- return inProgress;
- }
-
- private void encodeChangeSets(BinaryEncoder encoder, Map<Long, Set<ChangeId>> changes) throws IOException {
- if (!changes.isEmpty()) {
- encoder.writeInt(changes.size());
- for (Map.Entry<Long, Set<ChangeId>> entry : changes.entrySet()) {
- encoder.writeLong(entry.getKey());
- encodeChanges(encoder, entry.getValue());
- }
- }
- encoder.writeInt(0); // zero denotes end of list as per AVRO spec
- }
-
- private NavigableMap<Long, Set<ChangeId>> decodeChangeSets(BinaryDecoder decoder) throws IOException {
- int size = decoder.readInt();
- NavigableMap<Long, Set<ChangeId>> changeSets = new TreeMap<Long, Set<ChangeId>>();
- while (size != 0) { // zero denotes end of list as per AVRO spec
- for (int remaining = size; remaining > 0; --remaining) {
- changeSets.put(decoder.readLong(), decodeChanges(decoder));
- }
- size = decoder.readInt();
- }
- return changeSets;
- }
-
- private void encodeChanges(BinaryEncoder encoder, Set<ChangeId> changes) throws IOException {
- if (!changes.isEmpty()) {
- encoder.writeInt(changes.size());
- for (ChangeId change : changes) {
- encoder.writeBytes(change.getKey());
- }
- }
- encoder.writeInt(0); // zero denotes end of list as per AVRO spec
- }
-
- private Set<ChangeId> decodeChanges(BinaryDecoder decoder) throws IOException {
- int size = decoder.readInt();
- HashSet<ChangeId> changes = Sets.newHashSetWithExpectedSize(size);
- while (size != 0) { // zero denotes end of list as per AVRO spec
- for (int remaining = size; remaining > 0; --remaining) {
- changes.add(new ChangeId(decoder.readBytes()));
- }
- size = decoder.readInt();
- }
- // todo is there an immutable hash set?
- return changes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java
deleted file mode 100644
index 8e9624e..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionVisibilityState;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Interface to decode and encode a transaction snapshot. Each codec implements one version of the encoding.
- * It need not include the version when encoding the snapshot.
- */
-public interface SnapshotCodec {
-
- /**
- * @return the version of the encoding implemented by the codec.
- */
- int getVersion();
-
- /**
- * Encode a transaction snapshot into an output stream.
- * @param out the output stream to write to
- * @param snapshot the snapshot to encode
- */
- void encode(OutputStream out, TransactionSnapshot snapshot);
-
- /**
- * Decode a transaction snapshot from an input stream.
- * @param in the input stream to read from
- * @return the decoded snapshot
- */
- TransactionSnapshot decode(InputStream in);
-
- /**
- * Decode transaction visibility state from an input stream.
- * @param in the input stream to read from
- * @return {@link TransactionVisibilityState}
- */
- TransactionVisibilityState decodeTransactionVisibilityState(InputStream in);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java
deleted file mode 100644
index 28d829c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.inject.Inject;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.SortedMap;
-import javax.annotation.Nonnull;
-
-/**
- * Maintains the codecs for all known versions of the transaction snapshot encoding.
- */
-public class SnapshotCodecProvider implements SnapshotCodec {
-
- private static final Logger LOG = LoggerFactory.getLogger(SnapshotCodecProvider.class);
-
- private final SortedMap<Integer, SnapshotCodec> codecs = Maps.newTreeMap();
-
- @Inject
- public SnapshotCodecProvider(Configuration configuration) {
- initialize(configuration);
- }
-
- /**
- * Register all codec specified in the configuration with this provider.
- * There can only be one codec for a given version.
- */
- private void initialize(Configuration configuration) {
- String[] codecClassNames = configuration.getTrimmedStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
- List<Class> codecClasses = Lists.newArrayList();
- if (codecClassNames != null) {
- for (String clsName : codecClassNames) {
- try {
- codecClasses.add(Class.forName(clsName));
- } catch (ClassNotFoundException cnfe) {
- LOG.warn("Unable to load class configured for " + TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES
- + ": " + clsName, cnfe);
- }
- }
- }
-
- if (codecClasses.size() == 0) {
- codecClasses.addAll(Arrays.asList(TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES));
- }
- for (Class<?> codecClass : codecClasses) {
- try {
- SnapshotCodec codec = (SnapshotCodec) (codecClass.newInstance());
- codecs.put(codec.getVersion(), codec);
- LOG.debug("Using snapshot codec {} for snapshots of version {}", codecClass.getName(), codec.getVersion());
- } catch (Exception e) {
- LOG.warn("Error instantiating snapshot codec {}. Skipping.", codecClass.getName(), e);
- }
- }
- }
-
- /**
- * Retrieve the codec for a particular version of the encoding.
- * @param version the version of interest
- * @return the corresponding codec
- * @throws java.lang.IllegalArgumentException if the version is not known
- */
- @Nonnull
- @VisibleForTesting
- SnapshotCodec getCodecForVersion(int version) {
- SnapshotCodec codec = codecs.get(version);
- if (codec == null) {
- throw new IllegalArgumentException(String.format("Version %d of snapshot encoding is not supported", version));
- }
- return codec;
- }
-
- /**
- * Retrieve the current snapshot codec, that is, the codec with the highest known version.
- * @return the current codec
- * @throws java.lang.IllegalStateException if no codecs are registered
- */
- private SnapshotCodec getCurrentCodec() {
- if (codecs.isEmpty()) {
- throw new IllegalStateException(String.format("No codecs are registered."));
- }
- return codecs.get(codecs.lastKey());
- }
-
- // Return the appropriate codec for the version in InputStream
- private SnapshotCodec getCodec(InputStream in) {
- BinaryDecoder decoder = new BinaryDecoder(in);
- int persistedVersion;
- try {
- persistedVersion = decoder.readInt();
- } catch (IOException e) {
- LOG.error("Unable to read transaction state version: ", e);
- throw Throwables.propagate(e);
- }
- return getCodecForVersion(persistedVersion);
- }
-
- @Override
- public int getVersion() {
- return getCurrentCodec().getVersion();
- }
-
- @Override
- public TransactionSnapshot decode(InputStream in) {
- return getCodec(in).decode(in);
- }
-
- @Override
- public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
- return getCodec(in).decodeTransactionVisibilityState(in);
- }
-
- @Override
- public void encode(OutputStream out, TransactionSnapshot snapshot) {
- SnapshotCodec codec = getCurrentCodec();
- try {
- new BinaryEncoder(out).writeInt(codec.getVersion());
- } catch (IOException e) {
- LOG.error("Unable to write transaction state version: ", e);
- throw Throwables.propagate(e);
- }
- codec.encode(out, snapshot);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java
deleted file mode 100644
index 5d07ba5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- */
-public class SnapshotCodecV2 extends DefaultSnapshotCodec {
- @Override
- public int getVersion() {
- return 2;
- }
-
- @Override
- protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
- throws IOException {
-
- if (!inProgress.isEmpty()) {
- encoder.writeInt(inProgress.size());
- for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
- encoder.writeLong(entry.getKey()); // tx id
- encoder.writeLong(entry.getValue().getExpiration());
- encoder.writeLong(entry.getValue().getVisibilityUpperBound());
- encoder.writeInt(entry.getValue().getType().ordinal());
- }
- }
- encoder.writeInt(0); // zero denotes end of list as per AVRO spec
- }
-
- @Override
- protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
- throws IOException {
-
- int size = decoder.readInt();
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
- while (size != 0) { // zero denotes end of list as per AVRO spec
- for (int remaining = size; remaining > 0; --remaining) {
- long txId = decoder.readLong();
- long expiration = decoder.readLong();
- long visibilityUpperBound = decoder.readLong();
- int txTypeIdx = decoder.readInt();
- TransactionType txType;
- try {
- txType = TransactionType.values()[txTypeIdx];
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
- }
- inProgress.put(txId,
- new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType,
- new LongArrayList()));
- }
- size = decoder.readInt();
- }
- return inProgress;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java
deleted file mode 100644
index b11821a..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- *
- * <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2},
- * but a new version number is used to allow easy migration from projects using deprecated codecs with
- * conflicting version numbers.</p>
- */
-public class SnapshotCodecV3 extends SnapshotCodecV2 {
- @Override
- public int getVersion() {
- return 3;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java
deleted file mode 100644
index 41d30f2..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- *
- */
-public class SnapshotCodecV4 extends SnapshotCodecV2 {
- @Override
- public int getVersion() {
- return 4;
- }
-
- @Override
- protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
- throws IOException {
-
- if (!inProgress.isEmpty()) {
- encoder.writeInt(inProgress.size());
- for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
- encoder.writeLong(entry.getKey()); // tx id
- encoder.writeLong(entry.getValue().getExpiration());
- encoder.writeLong(entry.getValue().getVisibilityUpperBound());
- encoder.writeInt(entry.getValue().getType().ordinal());
- // write checkpoint tx IDs
- LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers();
- if (checkpointPointers != null && !checkpointPointers.isEmpty()) {
- encoder.writeInt(checkpointPointers.size());
- for (int i = 0; i < checkpointPointers.size(); i++) {
- encoder.writeLong(checkpointPointers.getLong(i));
- }
- }
- encoder.writeInt(0);
- }
- }
- encoder.writeInt(0); // zero denotes end of list as per AVRO spec
- }
-
- @Override
- protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
- throws IOException {
-
- int size = decoder.readInt();
- NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
- while (size != 0) { // zero denotes end of list as per AVRO spec
- for (int remaining = size; remaining > 0; --remaining) {
- long txId = decoder.readLong();
- long expiration = decoder.readLong();
- long visibilityUpperBound = decoder.readLong();
- int txTypeIdx = decoder.readInt();
- TransactionType txType;
- try {
- txType = TransactionType.values()[txTypeIdx];
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
- }
- // read checkpoint tx IDs
- int checkpointPointerSize = decoder.readInt();
- LongArrayList checkpointPointers = new LongArrayList(checkpointPointerSize);
- while (checkpointPointerSize != 0) {
- for (int checkpointRemaining = checkpointPointerSize; checkpointRemaining > 0; --checkpointRemaining) {
- checkpointPointers.add(decoder.readLong());
- }
- checkpointPointerSize = decoder.readInt();
- }
- inProgress.put(txId,
- new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, checkpointPointers));
- }
- size = decoder.readInt();
- }
- return inProgress;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java
deleted file mode 100644
index e2d044c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains interfaces and implementations for encoding and decoding transaction snapshots.
- */
-package co.cask.tephra.snapshot;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java
deleted file mode 100644
index e830524..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Provides {@code org.apache.hadoop.conf.Configuration} instances, constructed by the correct version used
- * for the runtime.
- */
-public class ConfigurationFactory implements Provider<Configuration> {
- private static class ConfigurationProviderFactory extends HBaseVersionSpecificFactory<ConfigurationProvider> {
- @Override
- protected String getHBase96Classname() {
- return "co.cask.tephra.hbase96.HBase96ConfigurationProvider";
- }
-
- @Override
- protected String getHBase98Classname() {
- return "co.cask.tephra.hbase98.HBase98ConfigurationProvider";
- }
-
- @Override
- protected String getHBase10Classname() {
- return "co.cask.tephra.hbase10.HBase10ConfigurationProvider";
- }
-
- @Override
- protected String getHBase11Classname() {
- return "co.cask.tephra.hbase11.HBase11ConfigurationProvider";
- }
-
- @Override
- protected String getHBase10CDHClassname() {
- return "co.cask.tephra.hbase10cdh.HBase10ConfigurationProvider";
- }
- }
-
- private final ConfigurationProvider provider = new ConfigurationProviderFactory().get();
-
- /**
- * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
- */
- @Override
- public Configuration get() {
- return provider.get();
- }
-
- /**
- * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
- *
- * @param baseConf additional configuration properties to merge on to the classpath configuration
- * @return the merged configuration
- */
- public Configuration get(Configuration baseConf) {
- return provider.get(baseConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java b/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java
deleted file mode 100644
index e88acf8..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Provides {@code Configuration} instances, constructed by the HBase version on which we are running.
- */
-public abstract class ConfigurationProvider implements Provider<Configuration> {
- @Override
- public abstract Configuration get();
-
- public abstract Configuration get(Configuration baseConf);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java
deleted file mode 100644
index 84311c9..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Detects the currently loaded HBase version. It is assumed that only one HBase version is loaded at a time,
- * since using more than one HBase version within the same process will require classloader isolation anyway.
- */
-public class HBaseVersion {
- private static final String HBASE_94_VERSION = "0.94";
- private static final String HBASE_96_VERSION = "0.96";
- private static final String HBASE_98_VERSION = "0.98";
- private static final String HBASE_10_VERSION = "1.0";
- private static final String HBASE_11_VERSION = "1.1";
- private static final String HBASE_12_VERSION = "1.2";
- private static final String CDH_CLASSIFIER = "cdh";
-
- private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class);
-
- /**
- * Represents the major version of the HBase library that is currently loaded.
- */
- public enum Version {
- HBASE_94("0.94"),
- HBASE_96("0.96"),
- HBASE_98("0.98"),
- HBASE_10("1.0"),
- HBASE_10_CDH("1.0-cdh"),
- HBASE_11("1.1"),
- HBASE_12_CDH("1.2-cdh"),
- UNKNOWN("unknown");
-
- final String majorVersion;
-
- Version(String majorVersion) {
- this.majorVersion = majorVersion;
- }
-
- public String getMajorVersion() {
- return majorVersion;
- }
- }
-
- private static Version currentVersion;
- private static String versionString;
- static {
- try {
- Class versionInfoClass = Class.forName("org.apache.hadoop.hbase.util.VersionInfo");
- Method versionMethod = versionInfoClass.getMethod("getVersion");
- versionString = (String) versionMethod.invoke(null);
- if (versionString.startsWith(HBASE_94_VERSION)) {
- currentVersion = Version.HBASE_94;
- } else if (versionString.startsWith(HBASE_96_VERSION)) {
- currentVersion = Version.HBASE_96;
- } else if (versionString.startsWith(HBASE_98_VERSION)) {
- currentVersion = Version.HBASE_98;
- } else if (versionString.startsWith(HBASE_10_VERSION)) {
- VersionNumber ver = VersionNumber.create(versionString);
- if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
- currentVersion = Version.HBASE_10_CDH;
- } else {
- currentVersion = Version.HBASE_10;
- }
- } else if (versionString.startsWith(HBASE_11_VERSION)) {
- currentVersion = Version.HBASE_11;
- } else if (versionString.startsWith(HBASE_12_VERSION)) {
- VersionNumber ver = VersionNumber.create(versionString);
- if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
- currentVersion = Version.HBASE_12_CDH;
- } else {
- // CDH 5.7 comes with HBase version 1.2.0-CDH5.7.0. However currently there is no
- // other hadoop distribution that uses HBase 1.2, so the version is set here to UNKNOWN.
- currentVersion = Version.UNKNOWN;
- }
- } else {
- currentVersion = Version.UNKNOWN;
- }
- } catch (Throwable e) {
- // must be a class loading exception, HBase is not there
- LOG.error("Unable to determine HBase version from string '{}', are HBase classes available?", versionString);
- LOG.error("Exception was: ", e);
- currentVersion = Version.UNKNOWN;
- }
- }
-
- /**
- * Returns the major version of the currently loaded HBase library.
- */
- public static Version get() {
- return currentVersion;
- }
-
- /**
- * Returns the full version string for the currently loaded HBase library.
- */
- public static String getVersionString() {
- return versionString;
- }
-
- /**
- * Prints out the HBase {@link Version} enum value for the current version of HBase on the classpath.
- */
- public static void main(String[] args) {
- boolean verbose = args.length == 1 && "-v".equals(args[0]);
- Version version = HBaseVersion.get();
- System.out.println(version.getMajorVersion());
- if (verbose) {
- System.out.println("versionString=" + getVersionString());
- }
- }
-
- /**
- * Utility class to parse apart version number components. The version string provided is expected to be in
- * the format: major[.minor[.patch[.last]][-classifier][-SNAPSHOT]
- *
- * <p>Only the major version number is actually required.</p>
- */
- public static class VersionNumber {
- private static final Pattern PATTERN =
- Pattern.compile("(\\d+)(\\.(\\d+))?(\\.(\\d+))?(\\.(\\d+))?(\\-(?!SNAPSHOT)([^\\-]+))?(\\-SNAPSHOT)?");
-
- private Integer major;
- private Integer minor;
- private Integer patch;
- private Integer last;
- private String classifier;
- private boolean snapshot;
-
- private VersionNumber(Integer major, Integer minor, Integer patch, Integer last,
- String classifier, boolean snapshot) {
- this.major = major;
- this.minor = minor;
- this.patch = patch;
- this.last = last;
- this.classifier = classifier;
- this.snapshot = snapshot;
- }
-
- public Integer getMajor() {
- return major;
- }
-
- public Integer getMinor() {
- return minor;
- }
-
- public Integer getPatch() {
- return patch;
- }
-
- public Integer getLast() {
- return last;
- }
-
- public String getClassifier() {
- return classifier;
- }
-
- public boolean isSnapshot() {
- return snapshot;
- }
-
- public static VersionNumber create(String versionString) throws ParseException {
- Matcher matcher = PATTERN.matcher(versionString);
- if (matcher.matches()) {
- String majorString = matcher.group(1);
- String minorString = matcher.group(3);
- String patchString = matcher.group(5);
- String last = matcher.group(7);
- String classifier = matcher.group(9);
- String snapshotString = matcher.group(10);
- return new VersionNumber(new Integer(majorString),
- minorString != null ? new Integer(minorString) : null,
- patchString != null ? new Integer(patchString) : null,
- last != null ? new Integer(last) : null,
- classifier,
- "-SNAPSHOT".equals(snapshotString));
- }
- throw new ParseException(
- "Input string did not match expected pattern: major[.minor[.patch]][-classifier][-SNAPSHOT]", 0);
- }
- }
-}