You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2012/09/07 01:31:50 UTC

[13/14] git commit: Add initial support for Caringo CAStor object store as backing store for S3 API

Add initial support for Caringo CAStor object store as backing store for S3 API

Signed-off-by: Chiradeep Vittal <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/5d208a5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/5d208a5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/5d208a5c

Branch: refs/heads/javelin
Commit: 5d208a5c95bfd6f22332bf1f5af8c4a13749c49a
Parents: 5ae15f8
Author: Jamshid Afshar <ja...@caringo.com>
Authored: Wed Sep 5 23:35:09 2012 -0700
Committer: Chiradeep Vittal <ch...@apache.org>
Committed: Wed Sep 5 23:35:09 2012 -0700

----------------------------------------------------------------------
 NOTICE                                             |   37 ++
 awsapi/pom.xml                                     |    5 +
 .../com/cloud/bridge/io/S3CAStorBucketAdapter.java |  479 +++++++++++++++
 awsapi/src/com/cloud/bridge/model/SHost.java       |    4 +-
 .../service/controller/s3/ServiceProvider.java     |   23 +-
 .../com/cloud/bridge/service/core/s3/S3Engine.java |    6 +
 6 files changed, 551 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9c124ab..13f608e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -657,3 +657,40 @@
         SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
         
         ================================================================================
+
+
+  For
+    CAStorSDK.jar
+
+        This product includes CAStorSDK (http://www.caringo.com/)
+        under the BSD License
+
+        ================================================================================
+        Copyright (c) 2009, Caringo, Inc.
+        All rights reserved.
+        
+        Redistribution and use in source and binary forms, with or without
+        modification, are permitted provided that the following conditions are met:
+        
+            * Redistributions of source code must retain the above copyright notice,
+              this list of conditions and the following disclaimer.
+        
+            * Redistributions in binary form must reproduce the above copyright notice,
+              this list of conditions and the following disclaimer in the documentation
+              and/or other materials provided with the distribution.
+        
+            * Neither the name of Caringo, Inc. nor the names of its contributors may
+              be used to endorse or promote products derived from this software without
+              specific prior written permission.
+        
+        THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+        ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+        WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+        DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+        ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+        (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+        LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+        ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+        (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+        SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+        ================================================================================

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/pom.xml
----------------------------------------------------------------------
diff --git a/awsapi/pom.xml b/awsapi/pom.xml
index cf91033..cc30062 100644
--- a/awsapi/pom.xml
+++ b/awsapi/pom.xml
@@ -94,6 +94,11 @@
       <artifactId>jasypt</artifactId>
       <version>${cs.jasypt.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.caringo.client</groupId>
+      <artifactId>CAStorSDK</artifactId>
+      <version>1.3.1-CS40</version>
+    </dependency>
   </dependencies>
   <build>
     <defaultGoal>install</defaultGoal>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java
----------------------------------------------------------------------
diff --git a/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java b/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java
new file mode 100644
index 0000000..ad6f6cd
--- /dev/null
+++ b/awsapi/src/com/cloud/bridge/io/S3CAStorBucketAdapter.java
@@ -0,0 +1,479 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.bridge.io;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.bridge.service.core.s3.S3BucketAdapter;
+import com.cloud.bridge.service.core.s3.S3MultipartPart;
+import com.cloud.bridge.service.exception.ConfigurationException;
+import com.cloud.bridge.service.exception.FileNotExistException;
+import com.cloud.bridge.service.exception.InternalErrorException;
+import com.cloud.bridge.service.exception.OutOfStorageException;
+import com.cloud.bridge.service.exception.UnsupportedException;
+import com.cloud.bridge.util.StringHelper;
+import com.cloud.bridge.util.OrderedPair;
+
+import com.caringo.client.locate.Locator;
+import com.caringo.client.locate.StaticLocator;
+import com.caringo.client.locate.ZeroconfLocator;
+import com.caringo.client.ResettableFileInputStream;
+import com.caringo.client.ScspClient;
+import com.caringo.client.ScspExecutionException;
+import com.caringo.client.ScspHeaders;
+import com.caringo.client.ScspQueryArgs;
+import com.caringo.client.ScspResponse;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.Header;
+
+/**
+ * Creates an SCSP client to a CAStor cluster, configured in "storage.root",
+ * and use CAStor as the back-end storage instead of a file system.
+ */
+public class S3CAStorBucketAdapter implements S3BucketAdapter {
+    protected final static Logger s_logger = Logger.getLogger(S3CAStorBucketAdapter.class);
+
+    private static final int HTTP_OK = 200;
+    private static final int HTTP_CREATED = 201;
+    private static final int HTTP_UNSUCCESSFUL = 300;
+    private static final int HTTP_PRECONDITION_FAILED = 412;
+
+    // For ScspClient
+    private static final int DEFAULT_SCSP_PORT = 80;
+    private static final int DEFAULT_MAX_POOL_SIZE = 50;
+    private static final int DEFAULT_MAX_RETRIES = 5;
+    private static final int CONNECTION_TIMEOUT = 60 * 1000; // Request activity timeout - 1 minute
+    private static final int CM_IDLE_TIMEOUT = 60 * 1000; // HttpConnectionManager idle timeout - 1 minute
+    private static final int LOCATOR_RETRY_TIMEOUT = 0; // StaticLocator pool retry timeout
+
+    private ScspClient _scspClient;  // talks to CAStor cluster
+    private Locator _locator;         // maintains list of CAStor nodes
+    private String _domain;          // domain where all CloudStack streams will live
+
+    private synchronized ScspClient myClient(String mountedRoot) {
+        if (_scspClient!=null) {
+            return _scspClient;
+        }
+        // The castor cluster is specified either by listing the ip addresses of some nodes, or
+        // by specifying "zeroconf=" and the cluster's mdns name -- this is "cluster" in castor's node.cfg.
+        // The "domain" to store streams can be specified. If not specified, streams will be written
+        // without a "domain" query arg, so they will go into the castor default domain.
+        // The port is optional and must be at the end of the config string, defaults to 80.
+        // Examples: "castor 172.16.78.130 172.16.78.131 80", "castor 172.16.78.130 domain=mycluster.example.com", 
+        // "castor zeroconf=mycluster.example.com domain=mycluster.example.com 80"
+        String[] cfg = mountedRoot.split(" ");
+        int numIPs = cfg.length-1;
+        String possiblePort = cfg[cfg.length-1];
+        int castorPort = DEFAULT_SCSP_PORT;
+        try {
+            castorPort = Integer.parseInt(possiblePort);
+            --numIPs;
+        } catch (NumberFormatException nfe) {
+            // okay, it's an ip address, not a port number
+        }
+        if (numIPs <= 0) {
+            throw new ConfigurationException("No CAStor nodes specified in '" + mountedRoot + "'");
+        }
+        HashSet<String> ips = new HashSet<String>();
+        String clusterName = null;
+        for ( int i = 0; i < numIPs; ++i ) {
+            String option = cfg[i+1]; // ip address or zeroconf=mycluster.example.com or domain=mydomain.example.com
+            if (option.toLowerCase().startsWith("zeroconf=")) {
+                String[] confStr = option.split("=");
+                if (confStr.length != 2) {
+                    throw new ConfigurationException("Could not parse cluster name from '" + option + "'");
+                }
+                clusterName = confStr[1];
+            } else if (option.toLowerCase().startsWith("domain=")) {
+                String[] confStr = option.split("=");
+                if (confStr.length != 2) {
+                    throw new ConfigurationException("Could not parse domain name from '" + option + "'");
+                }
+                _domain = confStr[1];
+            } else {
+                ips.add(option);
+            }
+        }
+        if (clusterName == null && ips.isEmpty()) {
+            throw new ConfigurationException("No CAStor nodes specified in '" + mountedRoot + "'");
+        }
+        String[] castorNodes = ips.toArray(new String[0]);  // list of configured nodes
+        if (clusterName == null) {
+            try {
+                _locator = new StaticLocator(castorNodes, castorPort, LOCATOR_RETRY_TIMEOUT);
+                _locator.start();
+            } catch (IOException e) {
+                throw new ConfigurationException("Could not create CAStor static locator for '" + 
+                                                 Arrays.toString(castorNodes) + "'");
+            }
+        } else {
+            try {
+                clusterName = clusterName.replace(".", "_"); // workaround needed for CAStorSDK 1.3.1
+                _locator = new ZeroconfLocator(clusterName);
+                _locator.start();
+            } catch (IOException e) {
+                throw new ConfigurationException("Could not create CAStor zeroconf locator for '" + clusterName + "'");
+            }
+        }
+        try {
+            s_logger.info("CAStor client starting: " + (_domain==null ? "default domain" : "domain " + _domain) + " " + (clusterName==null ? Arrays.toString(castorNodes) : clusterName) + " :" + castorPort);
+            _scspClient = new ScspClient(_locator, castorPort, DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_RETRIES, CONNECTION_TIMEOUT, CM_IDLE_TIMEOUT);
+            _scspClient.start();
+        } catch (Exception e) {
+            s_logger.error("Unable to create CAStor client for '" + mountedRoot + "': " + e.getMessage(), e);
+            throw new ConfigurationException("Unable to create CAStor client for '" + mountedRoot + "': " + e);
+        }
+        return _scspClient;
+    }
+
+    private String castorURL(String mountedRoot, String bucket, String fileName) {
+        // TODO: Replace this method with access to ScspClient's Locator,
+        // or add read method that returns the body as an unread
+        // InputStream for use by loadObject() and loadObjectRange().
+
+        myClient(mountedRoot); // make sure castorNodes and castorPort initialized
+        InetSocketAddress nodeAddr = _locator.locate();
+        if (nodeAddr == null) {
+            throw new ConfigurationException("Unable to locate CAStor node with locator " + _locator);
+        }
+        InetAddress nodeInetAddr = nodeAddr.getAddress();
+        if (nodeInetAddr == null) {
+            _locator.foundDead(nodeAddr);
+            throw new ConfigurationException("Unable to resolve CAStor node name '" + nodeAddr.getHostName() +
+                                             "' to IP address");
+        }
+        return "http://" + nodeInetAddr.getHostAddress() + ":" + nodeAddr.getPort() + "/" + bucket + "/" + fileName +
+            (_domain==null ? "" : "?domain=" + _domain);
+    }
+
+    private ScspQueryArgs domainQueryArg() {
+        ScspQueryArgs qa = new ScspQueryArgs();
+        if (this._domain != null)
+            qa.setValue("domain", this._domain);
+        return qa;
+    }
+
+    public S3CAStorBucketAdapter() {
+        // TODO: is there any way to initialize CAStor client here, can it
+        // get to config?
+    }
+
+    @Override
+    public void createContainer(String mountedRoot, String bucket) {
+        try {
+            ScspResponse bwResponse = myClient(mountedRoot).write(bucket, new ByteArrayInputStream("".getBytes()), 0, domainQueryArg(), new ScspHeaders());
+            if (bwResponse.getHttpStatusCode() != HTTP_CREATED) {
+                if (bwResponse.getHttpStatusCode() == HTTP_PRECONDITION_FAILED)
+                    s_logger.error("CAStor unable to create bucket " + bucket + " because domain " +
+                                   (this._domain==null ? "(default)" : this._domain) + " does not exist");
+                else
+                    s_logger.error("CAStor unable to create bucket " + bucket + ": " + bwResponse.getHttpStatusCode());
+                throw new OutOfStorageException("CAStor unable to create bucket " + bucket + ": " +
+                                                bwResponse.getHttpStatusCode());
+            }
+        } catch (ScspExecutionException e) {
+            s_logger.error("CAStor unable to create bucket " + bucket, e);
+            throw new OutOfStorageException("CAStor unable to create bucket " + bucket + ": " + e.getMessage());
+        }
+    }
+
+    @Override
+    public void deleteContainer(String mountedRoot, String bucket) {
+        try {
+            ScspResponse bwResponse = myClient(mountedRoot).delete("", bucket, domainQueryArg(), new ScspHeaders());
+            if (bwResponse.getHttpStatusCode() >= HTTP_UNSUCCESSFUL) {
+                s_logger.error("CAStor unable to delete bucket " + bucket + ": " + bwResponse.getHttpStatusCode());
+                throw new OutOfStorageException("CAStor unable to delete bucket " + bucket + ": " +
+                                                bwResponse.getHttpStatusCode());
+            }
+        } catch (ScspExecutionException e) {
+            s_logger.error("CAStor unable to delete bucket " + bucket, e);
+            throw new OutOfStorageException("CAStor unable to delete bucket " + bucket + ": " + e.getMessage());
+        }
+    }
+
+    @Override
+    public String saveObject(InputStream is, String mountedRoot, String bucket, String fileName)
+    {
+        // TODO: Currently this writes the object to a temporary file,
+        // so that the MD5 can be computed and so that we have the
+        // stream length needed by this version of CAStor SDK. Will
+        // change to calculate MD5 while streaming to CAStor and to
+        // either pass Content-length to this method or use newer SDK
+        // that doesn't require it.
+
+        FileOutputStream fos = null;
+        MessageDigest md5 = null;
+
+        try {
+            md5 = MessageDigest.getInstance("MD5");
+        } catch (NoSuchAlgorithmException e) {
+            s_logger.error("Unexpected exception " + e.getMessage(), e);
+            throw new InternalErrorException("Unable to get MD5 MessageDigest", e);
+        }
+
+        File spoolFile = null;
+        try {
+            spoolFile = File.createTempFile("castor", null);
+        } catch (IOException e) {
+            s_logger.error("Unexpected exception creating temporary CAStor spool file: " + e.getMessage(), e);
+            throw new InternalErrorException("Unable to create temporary CAStor spool file", e);
+        }
+        try {
+            String retVal;
+            int streamLen = 0;
+            try {
+                fos = new FileOutputStream(spoolFile);
+                byte[] buffer = new byte[4096];
+                int len = 0;
+                while( (len = is.read(buffer)) > 0) {
+                    fos.write(buffer, 0, len);
+                    streamLen = streamLen + len;
+                    md5.update(buffer, 0, len);
+
+                }
+                //Convert MD5 digest to (lowercase) hex String
+                retVal = StringHelper.toHexString(md5.digest());
+
+            } catch(IOException e) {
+                s_logger.error("Unexpected exception " + e.getMessage(), e);
+                throw new OutOfStorageException(e);
+            } finally {
+                try {
+                    if (null != fos)
+                        fos.close();
+                } catch( Exception e ) {
+                    s_logger.error("Can't close CAStor spool file " +
+                                   spoolFile.getAbsolutePath() + ": " + e.getMessage(), e);
+                    throw new OutOfStorageException("Unable to close CAStor spool file: " + e.getMessage(), e);
+                }
+            }
+
+            try {
+                ScspResponse bwResponse =
+                    myClient(mountedRoot).write(bucket + "/" + fileName,
+                                                new ResettableFileInputStream(spoolFile), streamLen,
+                                                domainQueryArg(), new ScspHeaders());
+                if (bwResponse.getHttpStatusCode() >= HTTP_UNSUCCESSFUL) {
+                    s_logger.error("CAStor write responded with error " + bwResponse.getHttpStatusCode());
+                    throw new OutOfStorageException("Unable to write object to CAStor " +
+                                                    bucket + "/" + fileName + ": " + bwResponse.getHttpStatusCode());
+                }
+            } catch (ScspExecutionException e) {
+                s_logger.error("Unable to write object to CAStor " + bucket + "/" + fileName, e);
+                throw new OutOfStorageException("Unable to write object to CAStor " + bucket + "/" + fileName + ": " +
+                                                e.getMessage());
+            } catch (IOException ie) {
+                s_logger.error("Unable to write object to CAStor " + bucket + "/" + fileName, ie);
+                throw new OutOfStorageException("Unable to write object to CAStor " + bucket + "/" + fileName + ": " +
+                                                ie.getMessage());
+            }
+            return retVal;
+        } finally {
+            try {
+                if (!spoolFile.delete()) {
+                    s_logger.error("Failed to delete CAStor spool file " + spoolFile.getAbsolutePath());
+                }
+            } catch (SecurityException e) {
+                s_logger.error("Unable to delete CAStor spool file " + spoolFile.getAbsolutePath(), e);
+            }
+        }
+    }
+
+    /**
+     * From a list of files (each being one part of the multipart upload), concatentate all files into a single
+     * object that can be accessed by normal S3 calls.    This function could take a long time since a multipart is
+     * allowed to have upto 10,000 parts (each 5 gib long).      Amazon defines that while this operation is in progress
+     * whitespace is sent back to the client inorder to keep the HTTP connection alive.
+     *
+     * @param mountedRoot - where both the source and dest buckets are located
+     * @param destBucket - resulting location of the concatenated objects
+     * @param fileName - resulting file name of the concatenated objects
+     * @param sourceBucket - special bucket used to save uploaded file parts
+     * @param parts - an array of file names in the sourceBucket
+     * @param client - if not null, then keep the servlet connection alive while this potentially long concatentation takes place
+     * @return OrderedPair with the first value the MD5 of the final object, and the second value the length of the final object
+     */
+    @Override
+    public OrderedPair<String,Long> concatentateObjects(String mountedRoot, String destBucket, String fileName, String sourceBucket, S3MultipartPart[] parts, OutputStream client)
+    {
+        // TODO
+        throw new UnsupportedException("Multipart upload support not yet implemented in CAStor plugin");
+
+        /*
+        MessageDigest md5;
+        long totalLength = 0;
+
+        try {
+            md5 = MessageDigest.getInstance("MD5");
+        } catch (NoSuchAlgorithmException e) {
+            s_logger.error("Unexpected exception " + e.getMessage(), e);
+            throw new InternalErrorException("Unable to get MD5 MessageDigest", e);
+        }
+
+        File file = new File(getBucketFolderDir(mountedRoot, destBucket) + File.separatorChar + fileName);
+        try {
+            // -> when versioning is off we need to rewrite the file contents
+            file.delete();
+            file.createNewFile();
+
+            final FileOutputStream fos = new FileOutputStream(file);
+            byte[] buffer = new byte[4096];
+
+            // -> get the input stream for the next file part
+            for( int i=0; i < parts.length; i++ )
+            {
+               DataHandler nextPart = loadObject( mountedRoot, sourceBucket, parts[i].getPath());
+               InputStream is = nextPart.getInputStream();
+
+               int len = 0;
+               while( (len = is.read(buffer)) > 0) {
+                   fos.write(buffer, 0, len);
+                   md5.update(buffer, 0, len);
+                   totalLength += len;
+               }
+               is.close();
+
+               // -> after each file write tell the client we are still here to keep connection alive
+               if (null != client) {
+                   client.write( new String(" ").getBytes());
+                   client.flush();
+               }
+            }
+            fos.close();
+            return new OrderedPair<String, Long>(StringHelper.toHexString(md5.digest()), new Long(totalLength));
+            //Create an ordered pair whose first element is the MD4 digest as a (lowercase) hex String
+        }
+        catch(IOException e) {
+            s_logger.error("concatentateObjects unexpected exception " + e.getMessage(), e);
+            throw new OutOfStorageException(e);
+        }
+        */
+    }
+
+    @Override
+    public DataHandler loadObject(String mountedRoot, String bucket, String fileName) {
+        try {
+            return new DataHandler(new URL(castorURL(mountedRoot, bucket, fileName)));
+        } catch (MalformedURLException e) {
+            s_logger.error("Failed to loadObject from CAStor", e);
+            throw new FileNotExistException("Unable to load object from CAStor: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public void deleteObject(String mountedRoot, String bucket, String fileName) {
+        String filePath = bucket + "/" + fileName;
+        try {
+            ScspResponse bwResponse = myClient(mountedRoot).delete("", filePath, domainQueryArg(), new ScspHeaders());
+            if (bwResponse.getHttpStatusCode() != HTTP_OK) {
+                s_logger.error("CAStor delete object responded with error " + bwResponse.getHttpStatusCode());
+                throw new OutOfStorageException("CAStor unable to delete object " + filePath + ": " +
+                                                bwResponse.getHttpStatusCode());
+            }
+        } catch (ScspExecutionException e) {
+            s_logger.error("CAStor unable to delete object " + filePath, e);
+            throw new OutOfStorageException("CAStor unable to delete object " + filePath + ": " + e.getMessage());
+        }
+    }
+
+    public class ScspDataSource implements DataSource {
+        GetMethod method;
+        public ScspDataSource(GetMethod m) {
+            method = m;
+        }
+        @Override
+        public String getContentType() {
+            Header h = method.getResponseHeader("Content-type");
+            return h==null ? null : h.getValue();
+        }
+        @Override
+        public InputStream getInputStream() throws IOException {
+            try {
+                return method.getResponseBodyAsStream();
+            } catch (Exception e) {
+                s_logger.error("CAStor loadObjectRange getInputStream error", e);
+                return null;
+            }
+        }
+        @Override
+        public String getName() {
+            assert(false);
+            return null;
+        }
+        @Override
+        public OutputStream getOutputStream() throws IOException {
+            assert(false);
+            return null;
+        }
+    }
+
+    @Override
+    public DataHandler loadObjectRange(String mountedRoot, String bucket, String fileName, long startPos, long endPos) {
+        try {
+            HttpClient httpClient = new HttpClient();
+            // Create a method instance.
+            GetMethod method = new GetMethod(castorURL(mountedRoot, bucket, fileName));
+            method.addRequestHeader("Range", "bytes=" + startPos + "-" + endPos);
+            int statusCode = httpClient.executeMethod(method);
+            if (statusCode < HTTP_OK || statusCode >= HTTP_UNSUCCESSFUL) {
+                s_logger.error("CAStor loadObjectRange response: "+  statusCode);
+                throw new FileNotExistException("CAStor loadObjectRange response: " + statusCode);
+            }
+            return new DataHandler(new ScspDataSource(method));
+        } catch (Exception e) {
+            s_logger.error("CAStor loadObjectRange failure", e);
+            throw new FileNotExistException("CAStor loadObjectRange failure: " + e);
+        }
+    }
+
+    @Override
+    public String getBucketFolderDir(String mountedRoot, String bucket) {
+        // This method shouldn't be needed and doesn't need to use
+        // mountedRoot (which is CAStor config values here), right?
+        String bucketFolder = getBucketFolderName(bucket);
+        return bucketFolder;
+    }
+
+    private String getBucketFolderName(String bucket) {
+        // temporary
+        String name = bucket.replace(' ', '_');
+        name = bucket.replace('\\', '-');
+        name = bucket.replace('/', '-');
+
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/model/SHost.java
----------------------------------------------------------------------
diff --git a/awsapi/src/com/cloud/bridge/model/SHost.java b/awsapi/src/com/cloud/bridge/model/SHost.java
index 4ed5b7e..a6a2f58 100644
--- a/awsapi/src/com/cloud/bridge/model/SHost.java
+++ b/awsapi/src/com/cloud/bridge/model/SHost.java
@@ -24,9 +24,11 @@ public interface SHost  {
 	
 	public static final int STORAGE_HOST_TYPE_LOCAL = 0;
 	public static final int STORAGE_HOST_TYPE_NFS = 1;
+    public static final int STORAGE_HOST_TYPE_CASTOR = 2;
 	public static enum StorageHostType {
 	    STORAGE_HOST_TYPE_LOCAL, //0
-	    STORAGE_HOST_TYPE_NFS //1
+        STORAGE_HOST_TYPE_NFS, //1
+        STORAGE_HOST_TYPE_CASTOR //2
 	}
 /*	private Long id;
 	

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java
----------------------------------------------------------------------
diff --git a/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java b/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java
index 2f1791e..2ddbbf2 100644
--- a/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java
+++ b/awsapi/src/com/cloud/bridge/service/controller/s3/ServiceProvider.java
@@ -243,7 +243,13 @@ public class ServiceProvider {
 		//PersistContext.flush();
 
 		String localStorageRoot = properties.getProperty("storage.root");
-		if (localStorageRoot != null) setupLocalStorage(localStorageRoot);
+        if (localStorageRoot != null) {
+            if (localStorageRoot.toLowerCase().startsWith("castor")) {
+                setupCAStorStorage(localStorageRoot);
+            } else {
+                setupLocalStorage(localStorageRoot);
+            }
+        }
 
 		multipartDir = properties.getProperty("storage.multipartDir");
 		
@@ -318,7 +324,20 @@ public class ServiceProvider {
 		}
 	}
 
-	public void shutdown() {
+    private void setupCAStorStorage(String storageRoot) {
+		SHostVO shost = shostDao.getLocalStorageHost(mhost.getId(), storageRoot);
+		if(shost == null) {
+			shost = new SHostVO();
+			shost.setMhost(mhost);
+			shost.setMhostid(mhost.getId());
+			shost.setHostType(SHost.STORAGE_HOST_TYPE_CASTOR);
+			shost.setHost(NetHelper.getHostName());
+			shost.setExportRoot(storageRoot);
+			shostDao.persist(shost);
+		}
+    }
+
+   public void shutdown() {
 		timer.cancel();
 
 		if(logger.isInfoEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/5d208a5c/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java
----------------------------------------------------------------------
diff --git a/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java b/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java
index a117d13..916c51d 100644
--- a/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java
+++ b/awsapi/src/com/cloud/bridge/service/core/s3/S3Engine.java
@@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
 import org.json.simple.parser.ParseException;
 
 import com.cloud.bridge.io.S3FileSystemBucketAdapter;
+import com.cloud.bridge.io.S3CAStorBucketAdapter;
 import com.cloud.bridge.model.BucketPolicyVO;
 import com.cloud.bridge.model.MHostMountVO;
 import com.cloud.bridge.model.MHostVO;
@@ -115,6 +116,7 @@ public class S3Engine {
     
     public S3Engine() {
     	bucketAdapters.put(SHost.STORAGE_HOST_TYPE_LOCAL, new S3FileSystemBucketAdapter());
+        bucketAdapters.put(SHost.STORAGE_HOST_TYPE_CASTOR, new S3CAStorBucketAdapter());
     }
     
     
@@ -1398,6 +1400,10 @@ public class S3Engine {
 			return new OrderedPair<SHostVO, String>(shost, shost.getExportRoot());
 		}
 		
+        if(shost.getHostType() == SHost.STORAGE_HOST_TYPE_CASTOR ) {
+            return new OrderedPair<SHostVO, String>(shost, shost.getExportRoot());
+        }
+
 		MHostMountVO mount = mountDao.getHostMount(ServiceProvider.getInstance().getManagementHostId(), shost.getId());
 		if(mount != null) {
 			return new OrderedPair<SHostVO, String>(shost, mount.getMountPath());