You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@whirr.apache.org by as...@apache.org on 2011/04/18 22:02:02 UTC
svn commit: r1094718 - in /incubator/whirr/trunk: ./ core/
core/src/main/java/org/apache/whirr/
core/src/main/java/org/apache/whirr/service/
core/src/main/java/org/apache/whirr/service/jclouds/
core/src/main/java/org/apache/whirr/util/ core/src/test/ja...
Author: asavu
Date: Mon Apr 18 20:02:02 2011
New Revision: 1094718
URL: http://svn.apache.org/viewvc?rev=1094718&view=rev
Log:
WHIRR-280. Create a blob cache that could be used for storing local files (asavu)
Added:
incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/jclouds/SaveHttpResponseTo.java
incubator/whirr/trunk/core/src/main/java/org/apache/whirr/util/BlobCache.java
incubator/whirr/trunk/core/src/test/java/org/apache/whirr/util/integration/BlobCacheTest.java
incubator/whirr/trunk/core/src/test/resources/whirr-core-test.properties
Modified:
incubator/whirr/trunk/CHANGES.txt
incubator/whirr/trunk/core/pom.xml
incubator/whirr/trunk/core/src/main/java/org/apache/whirr/ClusterSpec.java
incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/ClusterActionHandlerSupport.java
Modified: incubator/whirr/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/CHANGES.txt?rev=1094718&r1=1094717&r2=1094718&view=diff
==============================================================================
--- incubator/whirr/trunk/CHANGES.txt (original)
+++ incubator/whirr/trunk/CHANGES.txt Mon Apr 18 20:02:02 2011
@@ -39,6 +39,9 @@ Trunk (unreleased changes)
WHIRR-279. Create ClusterSpec aware BlobStoreContext factory class (asavu)
+ WHIRR-280. Create a blob cache that could be used for storing local
+ files (asavu)
+
BUG FIXES
WHIRR-253. ZooKeeper service should only authorize ingress to ZooKeeper
Modified: incubator/whirr/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/pom.xml?rev=1094718&r1=1094717&r2=1094718&view=diff
==============================================================================
--- incubator/whirr/trunk/core/pom.xml (original)
+++ incubator/whirr/trunk/core/pom.xml Mon Apr 18 20:02:02 2011
@@ -103,6 +103,24 @@
<artifactId>jsch</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <version>4.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
Modified: incubator/whirr/trunk/core/src/main/java/org/apache/whirr/ClusterSpec.java
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/src/main/java/org/apache/whirr/ClusterSpec.java?rev=1094718&r1=1094717&r2=1094718&view=diff
==============================================================================
--- incubator/whirr/trunk/core/src/main/java/org/apache/whirr/ClusterSpec.java (original)
+++ incubator/whirr/trunk/core/src/main/java/org/apache/whirr/ClusterSpec.java Mon Apr 18 20:02:02 2011
@@ -123,6 +123,8 @@ public class ClusterSpec {
BLOBSTORE_IDENTITY(String.class, false, "The blob store identity"),
BLOBSTORE_CREDENTIAL(String.class, false, "The blob store credential"),
+
+ BLOBSTORE_LOCATION_ID(String.class, false, "The blob store location ID"),
IMAGE_ID(String.class, false, "The ID of the image to use for " +
"instances. If not specified then a vanilla Linux image is " +
@@ -240,6 +242,8 @@ public class ClusterSpec {
private String publicKey;
private String locationId;
+ private String blobStoreLocationId;
+
private String imageId;
private String hardwareId;
@@ -295,6 +299,7 @@ public class ClusterSpec {
setHardwareMinRam(getInt(Property.HARDWARE_MIN_RAM, 1024));
setLocationId(getString(Property.LOCATION_ID));
+ setBlobStoreLocationId(getString(Property.BLOBSTORE_LOCATION_ID));
setClientCidrs(getList(Property.CLIENT_CIDRS));
setVersion(getString(Property.VERSION));
@@ -438,6 +443,10 @@ public class ClusterSpec {
return blobStoreCredential;
}
+ public String getBlobStoreLocationId() {
+ return blobStoreLocationId;
+ }
+
public String getServiceName() {
return serviceName;
}
@@ -522,6 +531,10 @@ public class ClusterSpec {
blobStoreCredential = credential;
}
+ public void setBlobStoreLocationId(String locationId) {
+ blobStoreLocationId = locationId;
+ }
+
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
@@ -675,6 +688,7 @@ public class ClusterSpec {
&& Objects.equal(hardwareId, that.hardwareId)
&& Objects.equal(hardwareMinRam, that.hardwareMinRam)
&& Objects.equal(locationId, that.locationId)
+ && Objects.equal(blobStoreLocationId, that.blobStoreLocationId)
&& Objects.equal(clientCidrs, that.clientCidrs)
&& Objects.equal(version, that.version)
;
@@ -686,7 +700,7 @@ public class ClusterSpec {
return Objects.hashCode(instanceTemplates, maxStartupRetries, provider,
identity, credential, blobStoreProvider, blobStoreIdentity, blobStoreCredential,
clusterName, serviceName, clusterUser, loginUser, publicKey, privateKey, imageId,
- hardwareId, locationId, clientCidrs, version, runUrlBase);
+ hardwareId, locationId, blobStoreLocationId, clientCidrs, version, runUrlBase);
}
public String toString() {
@@ -709,6 +723,7 @@ public class ClusterSpec {
.add("instanceSizeId", hardwareId)
.add("instanceMinRam", hardwareMinRam)
.add("locationId", locationId)
+ .add("blobStoreLocationId", blobStoreLocationId)
.add("clientCidrs", clientCidrs)
.add("version", version)
.toString();
Modified: incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/ClusterActionHandlerSupport.java
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/ClusterActionHandlerSupport.java?rev=1094718&r1=1094717&r2=1094718&view=diff
==============================================================================
--- incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/ClusterActionHandlerSupport.java (original)
+++ incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/ClusterActionHandlerSupport.java Mon Apr 18 20:02:02 2011
@@ -111,7 +111,7 @@ public abstract class ClusterActionHandl
} catch(ConfigurationException e) {
throw new IOException("Error loading " + defaultsPropertiesFile, e);
}
- }
+ }
/**
* A convenience method for adding a {@link RunUrlStatement} to a
Added: incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/jclouds/SaveHttpResponseTo.java
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/jclouds/SaveHttpResponseTo.java?rev=1094718&view=auto
==============================================================================
--- incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/jclouds/SaveHttpResponseTo.java (added)
+++ incubator/whirr/trunk/core/src/main/java/org/apache/whirr/service/jclouds/SaveHttpResponseTo.java Mon Apr 18 20:02:02 2011
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.whirr.service.jclouds;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.jclouds.scriptbuilder.domain.InterpretableStatement;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * This class is an improved version of the class already existing in jclouds
+ */
+public class SaveHttpResponseTo extends InterpretableStatement {
+ public SaveHttpResponseTo(String dir, String file, String method, URI endpoint, Multimap<String, String> headers) {
+ super(String.format("({md} %s && {cd} %s && [ ! -f %s ] && " +
+ "curl -C - -s -q -L --connect-timeout 10 --max-time 600 -X %s -s --retry 20 %s %s >%s)\n", dir, dir, file, method, Joiner.on(' ')
+ .join(Iterables.transform(headers.entries(), new Function<Map.Entry<String, String>, String>() {
+
+ @Override
+ public String apply(Map.Entry<String, String> from) {
+ return String.format("-H \"%s: %s\"", from.getKey(), from.getValue());
+ }
+
+ })), endpoint.toASCIIString(), file));
+ }
+
+}
+
Added: incubator/whirr/trunk/core/src/main/java/org/apache/whirr/util/BlobCache.java
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/src/main/java/org/apache/whirr/util/BlobCache.java?rev=1094718&view=auto
==============================================================================
--- incubator/whirr/trunk/core/src/main/java/org/apache/whirr/util/BlobCache.java (added)
+++ incubator/whirr/trunk/core/src/main/java/org/apache/whirr/util/BlobCache.java Mon Apr 18 20:02:02 2011
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.whirr.util;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.whirr.ClusterSpec;
+import org.apache.whirr.service.BlobStoreContextBuilder;
+import org.apache.whirr.service.ComputeServiceContextBuilder;
+import org.apache.whirr.service.jclouds.SaveHttpResponseTo;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.compute.ComputeService;
+import org.jclouds.compute.ComputeServiceContext;
+import org.jclouds.domain.Location;
+import org.jclouds.http.HttpRequest;
+import org.jclouds.scriptbuilder.domain.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Set;
+
+public class BlobCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
+
+ private static Map<ClusterSpec, BlobCache> instances = Maps.newHashMap();
+
+ static {
+ /* Ensure that all created containers are removed when the JVM stops */
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ BlobCache.dropAndCloseAll();
+ }
+ });
+ }
+
+ public synchronized static BlobCache getInstance(ClusterSpec spec) throws IOException {
+ if (instances.containsKey(spec) == false) {
+ instances.put(spec, new BlobCache(spec));
+ }
+ return instances.get(spec);
+ }
+
+ public synchronized static void dropAndCloseAll() {
+ for(BlobCache instance : instances.values()) {
+ instance.dropAndClose();
+ }
+ instances.clear();
+ }
+
+ BlobStoreContext context = null;
+ String container = null;
+ Location defaultLocation = null;
+
+ private BlobCache(ClusterSpec spec) throws IOException {
+ context = BlobStoreContextBuilder.build(spec);
+ updateDefaultLocation(spec);
+ }
+
+ public Location getLocation() {
+ return defaultLocation;
+ }
+
+ private void updateDefaultLocation(ClusterSpec spec) throws IOException {
+ if (spec.getBlobStoreLocationId() != null) {
+ /* find the location with the given Id */
+ for(Location loc : context.getBlobStore().listAssignableLocations()) {
+ if (loc.getId().equals(spec.getBlobStoreLocationId())) {
+ defaultLocation = loc;
+ break;
+ }
+ }
+ if (defaultLocation == null) {
+ LOG.warn("No blob store location found with this ID '{}'. " +
+ "Using default location.", spec.getBlobStoreLocationId());
+ }
+ } else if (spec.getLocationId() != null) {
+ /* find the closest location to the compute nodes */
+ ComputeServiceContext compute = ComputeServiceContextBuilder.build(spec);
+ try {
+ Set<String> computeIsoCodes = null;
+ for(Location loc : compute.getComputeService().listAssignableLocations()) {
+ if (loc.getId().equals(spec.getLocationId())) {
+ computeIsoCodes = loc.getIso3166Codes();
+ break;
+ }
+ }
+ if (computeIsoCodes == null) {
+ LOG.warn("Invalid compute location ID '{}'. " +
+ "Using default blob store location.", spec.getLocationId());
+ } else {
+ for (Location loc : context.getBlobStore().listAssignableLocations()) {
+ if (containsAny(loc.getIso3166Codes(), computeIsoCodes)) {
+ defaultLocation = loc;
+ break;
+ }
+ }
+ }
+ } finally {
+ compute.close();
+ }
+ }
+ }
+
+ private <T> boolean containsAny(Set<T> set1, Set<T> set2) {
+ for (T el : set1) {
+ if (set2.contains(el)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public synchronized void putIfAbsent(String localUri) throws URISyntaxException, IOException {
+ putIfAbsent(new URI(localUri));
+ }
+
+ public synchronized void putIfAbsent(URI uri) throws IOException {
+ try {
+ putIfAbsent(new File(uri));
+ } catch(FileNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public synchronized void putIfAbsent(File file) throws FileNotFoundException {
+ putIfAbsent(file.getName(), new FileInputStream(file), file.length());
+ }
+
+ public synchronized void putIfAbsent(String name, InputStream in, long contentLength) {
+ allocateContainer();
+
+ BlobStore store = context.getBlobStore();
+ if (!store.blobExists(container, name)) {
+ LOG.info("Uploading '{}' to '{}' blob cache.", name, container);
+
+ Blob blob = context.getBlobStore().newBlob(name);
+ blob.setPayload(in);
+ blob.getMetadata().getContentMetadata().setContentLength(contentLength);
+ store.putBlob(container, blob);
+ }
+ }
+
+ public synchronized Statement getAsSaveToStatement(String target, String name) throws IOException {
+ HttpRequest req = getSignedRequest(name);
+ return new SaveHttpResponseTo(target, name, req.getMethod(), req.getEndpoint(), req.getHeaders());
+ }
+
+ public synchronized Statement getAsSaveToStatement(String target, URI uri) throws IOException {
+ return getAsSaveToStatement(target, new File(uri).getName());
+ }
+
+ public synchronized HttpRequest getSignedRequest(String blobName) throws IOException {
+ checkExistsBlob(blobName);
+ return context.getSigner().signGetBlob(container, blobName);
+ }
+
+ private void checkExistsBlob(String name) throws IOException {
+ if (container == null || !context.getBlobStore().blobExists(container, name)) {
+ throw new IOException("Blob not found: " + container + ":" + name);
+ }
+ }
+
+ private void allocateContainer() {
+ if (container == null) {
+ container = generateRandomContainerName();
+ }
+ }
+
+ private String generateRandomContainerName() {
+ String candidate;
+ do {
+ candidate = RandomStringUtils.randomAlphanumeric(12).toLowerCase();
+ } while(!context.getBlobStore().createContainerInLocation(defaultLocation, candidate));
+ LOG.info("Created blob cache container '{}' located in '{}'", candidate, defaultLocation);
+ return candidate;
+ }
+
+ private void dropAndClose() {
+ if (container != null) {
+ LOG.info("Removing blob cache '{}'", container);
+ context.getBlobStore().deleteContainer(container);
+ }
+ context.close();
+ }
+}
Added: incubator/whirr/trunk/core/src/test/java/org/apache/whirr/util/integration/BlobCacheTest.java
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/src/test/java/org/apache/whirr/util/integration/BlobCacheTest.java?rev=1094718&view=auto
==============================================================================
--- incubator/whirr/trunk/core/src/test/java/org/apache/whirr/util/integration/BlobCacheTest.java (added)
+++ incubator/whirr/trunk/core/src/test/java/org/apache/whirr/util/integration/BlobCacheTest.java Mon Apr 18 20:02:02 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.whirr.util.integration;
+
+import com.google.common.io.Files;
+import com.jcraft.jsch.JSchException;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.whirr.ClusterSpec;
+import org.apache.whirr.util.BlobCache;
+import org.jclouds.http.HttpRequest;
+import org.jclouds.scriptbuilder.domain.OsFamily;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class BlobCacheTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlobCacheTest.class);
+
+ private Configuration getTestConfiguration() throws ConfigurationException {
+ return new PropertiesConfiguration("whirr-core-test.properties");
+ }
+
+ private ClusterSpec getTestClusterSpec() throws Exception {
+ return ClusterSpec.withTemporaryKeys(getTestConfiguration());
+ }
+
+ @Test
+ public void testUploadFileToBlobCache() throws Exception {
+ String expected = "dummy content";
+ File tempFile = createTemporaryFile(expected);
+
+ ClusterSpec spec = getTestClusterSpec();
+ BlobCache cache = BlobCache.getInstance(spec);
+
+ try {
+ cache.putIfAbsent(tempFile);
+
+ HttpRequest req = cache.getSignedRequest(tempFile.getName());
+ assertThat(readContent(req), is(expected));
+
+ /* render download statement for visual test inspection */
+ LOG.info(cache.getAsSaveToStatement("/tmp",
+ tempFile.getName()).render(OsFamily.UNIX));
+
+ } finally {
+ BlobCache.dropAndCloseAll();
+ }
+ }
+
+ @Test
+ public void testSelectBestLocation() throws Exception {
+ ClusterSpec spec = getTestClusterSpec();
+ if (!spec.getProvider().equals("aws") && !spec.getProvider().equals("aws-ec2")) {
+ return; // this test can be executed only on amazon but the internal
+ // location selection mechanism should work for any cloud provider
+ }
+ spec.setLocationId("eu-west-1");
+
+ BlobCache cache = BlobCache.getInstance(spec);
+ assertThat(cache.getLocation().getId(), is("EU"));
+ }
+
+ private String readContent(HttpRequest req) throws IOException {
+ HttpClient client = new DefaultHttpClient();
+ try {
+ HttpGet get = new HttpGet(req.getEndpoint());
+
+ Map<String, Collection<String>> headers = req.getHeaders().asMap();
+ for(String key : headers.keySet()) {
+ for(String value : headers.get(key)) {
+ get.addHeader(key, value);
+ }
+ }
+
+ ResponseHandler<String> handler = new BasicResponseHandler();
+ return client.execute(get, handler);
+
+ } finally {
+ client.getConnectionManager().shutdown();
+ }
+ }
+
+ private File createTemporaryFile(String content) throws IOException {
+ File tempFile = File.createTempFile("whirr", ".txt");
+ tempFile.deleteOnExit();
+ Files.write(content, tempFile, Charset.defaultCharset());
+ return tempFile;
+ }
+
+}
Added: incubator/whirr/trunk/core/src/test/resources/whirr-core-test.properties
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/core/src/test/resources/whirr-core-test.properties?rev=1094718&view=auto
==============================================================================
--- incubator/whirr/trunk/core/src/test/resources/whirr-core-test.properties (added)
+++ incubator/whirr/trunk/core/src/test/resources/whirr-core-test.properties Mon Apr 18 20:02:02 2011
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+whirr.cluster-name=coretest
+whirr.provider=${sys:whirr.test.provider}
+whirr.identity=${sys:whirr.test.identity}
+whirr.credential=${sys:whirr.test.credential}