You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stanbol.apache.org by su...@apache.org on 2012/06/01 17:48:25 UTC

svn commit: r1345253 [1/2] - in /incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file: ./ serializer/

Author: suat
Date: Fri Jun  1 15:48:24 2012
New Revision: 1345253

URL: http://svn.apache.org/viewvc?rev=1345253&view=rev
Log:
(Missing files for the previous commit)

STANBOL-498: File based implementation of Contenthub store. 

For the time being this commit covers the requirements for file system based implementation of the Store interface defined in STANBOL-498.

Revision management:
When a content item is stored or deleted its revision is set as the system time at the time of the operation. Revisions regarding the content items are stored in an Apache Derby table. 

Revision management is thought to be used by the indexing part of Contenthub in a way that different semantic implementations would query for the changes in the store as of a specific revision and update the index accordingly in an automatic way. In case there would be large number of changes in the store, store interface provides retrieval of changes in an iterative way by giving the suitable offset and batch size values.

Serialization / Deserialization:
For the time being, content items are serialized within a directory as zip files. Currently there are (de)serializers for TripleCollection and Blob parts of content items. Each such part become an entry within the zip file. It is possible register (de)serializers to the environment for specific types of content parts.

Other:
This implementation also keeps track of the data to be presented in the HTML interface i.e the recently stored content items and some basic information about them such as enhancement count, mimetype, etc.

Added:
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/ChangeSetImpl.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileRevisionManager.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStore.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStoreDBManager.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobDeserializerProvider.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobSerializerProvider.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializer.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializerProvider.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializer.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializerProvider.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/IndexedGraphDeserializerProvider.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/IndexedMGraphDeserializerProvider.java
    incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/TripleCollectionSerializerProvider.java

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/ChangeSetImpl.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/ChangeSetImpl.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/ChangeSetImpl.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/ChangeSetImpl.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file;
+
+import java.util.Set;
+
+import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.stanbol.contenthub.servicesapi.store.ChangeSet;
+import org.apache.stanbol.contenthub.servicesapi.store.Store;
+
+public class ChangeSetImpl implements ChangeSet {
+    private long from;
+    private long to;
+    private Set<UriRef> changedUris;
+    private Store store;
+
+    @Override
+    public long fromRevision() {
+        return from;
+    }
+
+    @Override
+    public long toRevision() {
+        return to;
+    }
+
+    @Override
+    public Set<UriRef> changed() {
+        return changedUris;
+    }
+
+    @Override
+    public Store getStore() {
+        return store;
+    }
+
+    public void setFrom(long from) {
+        this.from = from;
+    }
+
+    public void setTo(long to) {
+        this.to = to;
+    }
+
+    public void setChangedUris(Set<UriRef> changedUris) {
+        this.changedUris = changedUris;
+    }
+
+    public void setStore(Store store) {
+        this.store = store;
+    }
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileRevisionManager.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileRevisionManager.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileRevisionManager.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.contenthub.servicesapi.store.ChangeSet;
+import org.apache.stanbol.contenthub.servicesapi.store.Store;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.apache.stanbol.enhancer.servicesapi.ContentItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class aims to manage the revisions regarding to the {@link ContentItem}s stored in a {@link Store}.
+ * The revisions are kept in a single table in the scope of Apache Derby database. The system time is set as
+ * the new revision of the {@link ContentItem} when the {@link #updateRevision(String)} method is called.
+ * 
+ * @author suat
+ * 
+ */
+@Component(immediate = true)
+@Service(value = FileRevisionManager.class)
+public class FileRevisionManager {
+    private static Logger log = LoggerFactory.getLogger(FileRevisionManager.class);
+
+    public static final String REVISION_TABLE_NAME = "content_item_revisions";
+
+    private static final String SELECT_REVISION = "SELECT id,revision FROM " + REVISION_TABLE_NAME
+                                                  + " content_item_revision WHERE id = ?";
+
+    private static final String INSERT_REVISION = "INSERT INTO " + REVISION_TABLE_NAME
+                                                  + " (id, revision) VALUES (?,?)";
+
+    private static final String UPDATE_REVISION = "UPDATE " + REVISION_TABLE_NAME
+                                                  + " SET revision=? WHERE id=?";
+
+    private static final String SELECT_CHANGES = "SELECT id, revision FROM " + REVISION_TABLE_NAME
+                                                 + " WHERE revision > ? ORDER BY revision ASC OFFSET ? ROWS";
+
+    @Reference
+    FileStoreDBManager dbManager;
+
+    /**
+     * Updates revision of the {@link ContentItem} specified with the <code>contentItemID</code> parameter.
+     * The system time set as the new revision number by {@link System#currentTimeMillis()}.
+     * 
+     * @param contentItemID
+     *            ID of the {@link ContentItem} of which revision to be updated
+     * @throws StoreException
+     */
+    public void updateRevision(String contentItemID) throws StoreException {
+        // get connection
+        Connection con = dbManager.getConnection();
+
+        // check existence of record for the given content item id
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        boolean recordExist = false;
+        try {
+            ps = con.prepareStatement(SELECT_REVISION);
+            ps.setString(1, contentItemID);
+            rs = ps.executeQuery();
+            if (rs.next()) {
+                recordExist = true;
+            }
+
+        } catch (SQLException e) {
+            dbManager.closeConnection(con);
+            log.error("Failed to query revision of content item", e);
+            throw new StoreException("Failed to query revision of content item", e);
+        } finally {
+            dbManager.closeResultSet(rs);
+            dbManager.closeStatement(ps);
+        }
+
+        // update the table
+        try {
+            long newRevision = System.currentTimeMillis();
+            if (!recordExist) {
+                log.debug("New revision: {} for the content item: {} will be added", newRevision,
+                    contentItemID);
+                ps = con.prepareStatement(INSERT_REVISION);
+                ps.setString(1, contentItemID);
+                ps.setLong(2, newRevision);
+            } else {
+                log.debug("New revision: {} for the content item: {} will be updated", newRevision,
+                    contentItemID);
+                ps = con.prepareStatement(UPDATE_REVISION);
+                ps.setString(2, contentItemID);
+                ps.setLong(1, newRevision);
+            }
+            int updatedRecordNum = ps.executeUpdate();
+            // exactly one record should be affected
+            if (updatedRecordNum != 1) {
+                log.warn("Unexpected number of updated records: {}, should be 1", updatedRecordNum);
+            }
+        } catch (SQLException e) {
+            log.error("Failed to update revision", e);
+            throw new StoreException("Failed to update revision", e);
+        } finally {
+            dbManager.closeStatement(ps);
+            dbManager.closeConnection(con);
+        }
+    }
+
+    /**
+     * Returns the updates after the given revision number. It returns at most <code>batchSize</code> number
+     * of changes within the returned {@link ChangeSet} object starting from the given <code>offset</code>.
+     * This method does not necessarily return the all changes for the given revision number. If there are
+     * more changes than the batch size for the given version, only batch size number of changes are returned.
+     * Other changes must be obtained by giving the same revision and the suitable offset.
+     * 
+     * @param revision
+     *            Starting revision number for the returned {@link ChangeSet}
+     * @param offset
+     *            Starting number of the changes as of the given <code>revision</code>.
+     * @param batchSize
+     *            Maximum number of changes to be returned
+     * @return a {@link ChangeSet} including the changes in the store
+     * @throws StoreException
+     */
+    public ChangeSet getChanges(long revision, int offset, int batchSize) throws StoreException {
+        ChangeSetImpl changes = new ChangeSetImpl();
+
+        // get connection
+        Connection con = dbManager.getConnection();
+
+        // check existence of record for the given content item id
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        try {
+            ps = con.prepareStatement(SELECT_CHANGES, ResultSet.TYPE_SCROLL_INSENSITIVE,
+                ResultSet.CONCUR_READ_ONLY);
+            ps.setLong(1, revision);
+            ps.setLong(2, offset);
+            ps.setMaxRows(batchSize);
+            rs = ps.executeQuery();
+
+            // set changed uris
+            Set<UriRef> changedUris = new LinkedHashSet<UriRef>();
+            while (rs.next()) {
+                changedUris.add(new UriRef(rs.getString(1)));
+            }
+            changes.setChangedUris(changedUris);
+            // set minimum and maximum revision numbers of the change set
+            rs.first();
+            changes.setFrom(rs.getLong(2));
+            rs.last();
+            changes.setTo(rs.getLong(2));
+
+        } catch (SQLException e) {
+            log.error("Failed to get changes", e);
+            throw new StoreException("Failed to get changes", e);
+        } finally {
+            dbManager.closeResultSet(rs);
+            dbManager.closeStatement(ps);
+            dbManager.closeConnection(con);
+        }
+        return changes;
+    }
+
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStore.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStore.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStore.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStore.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipException;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.clerezza.rdf.core.MGraph;
+import org.apache.clerezza.rdf.core.Triple;
+import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.commons.io.IOUtils;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.commons.indexedgraph.IndexedMGraph;
+import org.apache.stanbol.contenthub.servicesapi.store.ChangeSet;
+import org.apache.stanbol.contenthub.servicesapi.store.Store;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.apache.stanbol.contenthub.store.file.serializer.ContentPartDeserializer;
+import org.apache.stanbol.contenthub.store.file.serializer.ContentPartSerializer;
+import org.apache.stanbol.enhancer.servicesapi.Blob;
+import org.apache.stanbol.enhancer.servicesapi.ContentItem;
+import org.apache.stanbol.enhancer.servicesapi.ContentItemFactory;
+import org.apache.stanbol.enhancer.servicesapi.EnhancementException;
+import org.apache.stanbol.enhancer.servicesapi.EnhancementJobManager;
+import org.apache.stanbol.enhancer.servicesapi.NoSuchPartException;
+import org.apache.stanbol.enhancer.servicesapi.helper.ContentItemHelper;
+import org.apache.stanbol.enhancer.servicesapi.impl.StreamSource;
+import org.apache.stanbol.enhancer.servicesapi.impl.StringSource;
+import org.apache.stanbol.enhancer.servicesapi.rdf.Properties;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This is the file based implementation of {@link Store} interface. It provides storage, retrieval and
+ * deletion functionalities for {@link ContentItem}s . All {@link ContentItem}s are stored in a single
+ * directory as zip files. {@link ContentItem} parts are serialized into the zip file if there are registered
+ * {@link ContentPartSerializer}s for the specific types of content parts. Similarly, to be able to
+ * deserialize the content parts from the zip, there must be registered {@link ContentPartDeserializer}s
+ * suitable with the types of the parts.
+ * </p>
+ * <p>
+ * Revisions of {@link ContentItem}s submitted to this store is managed through the
+ * {@link FileRevisionManager}. Once a document added, updated or deleted the revision of the
+ * {@link ContentItem} is set as {@link System#currentTimeMillis()}.
+ * </p>
+ * <p>
+ * To be able to populate the HTML interface, this implementation also provides additional metadata regarding
+ * the {@link ContentItem}s. Additional metadata is also stored in the Apache Derby database.
+ * </p>
+ * 
+ * @author suat
+ * @author meric
+ * 
+ */
+@Component(immediate = false)
+@Service
+public class FileStore implements Store {
+    public static final String RECENTLY_ENHANCED_TABLE_NAME = "recently_enhanced_content_items";
+
+    public static final UriRef CONSTRAINTS_URI = new UriRef("org.apache.stanbol.contenthub.constraints");
+
+    public static final UriRef HTMLMETADATA_URI = new UriRef("org.apache.stanbol.contenthub.htmlmetadata");
+
+    public static final String FIELD_MIME_TYPE = "mimetype";
+
+    public static final String FIELD_ENHANCEMENT_COUNT = "enhancementCount";
+
+    public static final String FIELD_TITLE = "title";
+
+    public static final String FIELD_ID = "id";
+
+    private static final String SELECT_RECENTLY_ENHANCED_ITEMS = "SELECT t1.id, mimeType, enhancementCount, title FROM "
+                                                                 + FileRevisionManager.REVISION_TABLE_NAME
+                                                                 + " t1, "
+                                                                 + RECENTLY_ENHANCED_TABLE_NAME
+                                                                 + " t2 WHERE"
+                                                                 + " t1.id=t2."
+                                                                 + FIELD_ID
+                                                                 + " ORDER BY revision DESC"
+                                                                 + " OFFSET ? ROWS";
+
+    private static final String SELECT_RECENTLY_ENHANCED_ITEM = "SELECT " + FIELD_ID + " FROM "
+                                                                + RECENTLY_ENHANCED_TABLE_NAME + " WHERE "
+                                                                + FIELD_ID + "= ?";
+
+    private static final String INSERT_RECENTLY_ENHANCED_ITEM = "INSERT INTO " + RECENTLY_ENHANCED_TABLE_NAME
+                                                                + " (" + FIELD_ID + "," + FIELD_TITLE + ","
+                                                                + FIELD_MIME_TYPE + ","
+                                                                + FIELD_ENHANCEMENT_COUNT + ") VALUES "
+                                                                + " (?,?,?,?)";
+
+    private static final String UPDATE_RECENTLY_ENHANCED_ITEM = "UPDATE " + RECENTLY_ENHANCED_TABLE_NAME
+                                                                + " SET " + FIELD_MIME_TYPE + "=?, "
+                                                                + FIELD_TITLE + "=?,"
+                                                                + FIELD_ENHANCEMENT_COUNT + "=? WHERE "
+                                                                + FIELD_ID + "=?";
+
+    private static final String REMOVE_RECENTLY_ENHANCED_ITEM = "DELETE FROM " + RECENTLY_ENHANCED_TABLE_NAME
+                                                                + " WHERE " + FIELD_ID + "=?";
+
+    private final Logger log = LoggerFactory.getLogger(FileStore.class);
+
+    private File storeFolder;
+
+    @Reference
+    ContentItemFactory contentItemFactory;
+
+    @Reference
+    FileRevisionManager revisionManager;
+
+    @Reference
+    FileStoreDBManager dbManager;
+
+    @Reference
+    private ContentPartSerializer contentPartSerializer;
+
+    @Reference
+    private ContentPartDeserializer contentPartDeserializer;
+
+    @Reference
+    private EnhancementJobManager jobManager;
+
+    @Activate
+    protected void activate(ComponentContext componentContext) throws StoreException {
+        // check store folder
+        String stanbolHome = componentContext.getBundleContext().getProperty("sling.home");
+        String fileStoreName = "filestore";
+        storeFolder = new File(stanbolHome + "/" + fileStoreName);
+        if (!storeFolder.exists()) {
+            storeFolder.mkdirs();
+        }
+    }
+
+    @Override
+    public ContentItem remove(UriRef id) throws StoreException {
+        checkStoreFolder();
+        String urlEncodedId = encodeId(id.getUnicodeString());
+        File f = new File(storeFolder.getPath() + "/" + urlEncodedId + ".zip");
+        ContentItem ci = null;
+        if (f.exists()) {
+            ci = get(id);
+            f.delete();
+            updateTablesForDelete(id);
+        } else {
+            log.warn("There is no file corresponding to the id: {}", id.getUnicodeString());
+        }
+        return ci;
+    }
+
+    private void updateTablesForDelete(UriRef id) throws StoreException {
+        // update revision
+        revisionManager.updateRevision(id.getUnicodeString());
+        // update recently_enhanced table
+        removeFromRecentlyEnhancedTable(id.getUnicodeString());
+    }
+
+    public void removeFromRecentlyEnhancedTable(String contentItemID) throws StoreException {
+        // get connection
+        Connection con = dbManager.getConnection();
+
+        PreparedStatement ps = null;
+        try {
+            ps = con.prepareStatement(REMOVE_RECENTLY_ENHANCED_ITEM);
+            ps.setString(1, contentItemID);
+            int updatedRecordNum = ps.executeUpdate();
+            // exactly one record should be affected
+            if (updatedRecordNum != 1) {
+                log.warn("Unexpected number of updated records: {}, should be 1", updatedRecordNum);
+            }
+        } catch (SQLException e) {
+            log.error("Failed to execute query", e);
+            throw new StoreException("Failed to execute query", e);
+        } finally {
+            dbManager.closeStatement(ps);
+            dbManager.closeConnection(con);
+        }
+    }
+
+    @Override
+    public String put(ContentItem ci) throws StoreException {
+        try {
+            jobManager.enhanceContent(ci);
+        } catch (EnhancementException e) {
+            throw new StoreException(String.format("Failed to enhance given content item with URI: %s",
+                ci.getUri()), e);
+        }
+
+        long enhancementCount = getEnhancementCount(ci);
+        JSONObject htmlMetadata = getHTMLMetadata(ci);
+        htmlMetadata = populateHTMLMetadataPart(ci, htmlMetadata, enhancementCount);
+        updateTablesForPut(htmlMetadata);
+        archiveContentItem(ci);
+        return ci.getUri().getUnicodeString();
+    }
+
+    private long getEnhancementCount(ContentItem ci) {
+        long enhancementCount = 0;
+        Iterator<Triple> it = ci.getMetadata().filter(null, Properties.ENHANCER_EXTRACTED_FROM,
+            new UriRef(ci.getUri().getUnicodeString()));
+        while (it.hasNext()) {
+            it.next();
+            enhancementCount++;
+        }
+        return enhancementCount;
+    }
+
+    private JSONObject getHTMLMetadata(ContentItem ci) throws StoreException {
+        JSONObject htmlMetadata;
+        Blob htmlMetadataPart;
+        try {
+            htmlMetadataPart = ci.getPart(HTMLMETADATA_URI, Blob.class);
+            String jsonString = IOUtils.toString(htmlMetadataPart.getStream());
+            htmlMetadata = new JSONObject(jsonString);
+        } catch (NoSuchPartException e) {
+            // there is no HTML metadata part yet
+            htmlMetadata = new JSONObject();
+        } catch (IOException e) {
+            throw new StoreException("Failed to read HTML metadata part", e);
+        } catch (JSONException e) {
+            throw new StoreException("Failed to construct JSONObject from the HTML metadata part", e);
+        }
+        return htmlMetadata;
+    }
+
+    private JSONObject populateHTMLMetadataPart(ContentItem ci, JSONObject htmlMetadata, long enhancementCount) throws StoreException {
+
+        // populate HTML metadata
+        try {
+            htmlMetadata.put(FIELD_ENHANCEMENT_COUNT, enhancementCount);
+            htmlMetadata.put(FIELD_MIME_TYPE, ci.getMimeType());
+            htmlMetadata.put(FIELD_ID, ci.getUri().getUnicodeString());
+        } catch (JSONException e) {
+            throw new StoreException("Failed to create HTML metadata part", e);
+        }
+
+        // attach the populated part to content item
+        try {
+            Blob htmlMetadataPart = contentItemFactory.createBlob(new StringSource(htmlMetadata.toString()));
+            ci.addPart(HTMLMETADATA_URI, htmlMetadataPart);
+        } catch (IOException e) {
+            throw new StoreException("Failed to create blob for HTML part", e);
+        }
+        return htmlMetadata;
+    }
+
+    private void updateTablesForPut(JSONObject htmlMetadata) throws StoreException {
+        // update revision
+        String title = "";
+        try {
+            title = htmlMetadata.getString(FIELD_TITLE);
+        } catch (JSONException e) {
+            // ignore the exception
+        }
+        try {
+            revisionManager.updateRevision(htmlMetadata.getString(FIELD_ID));
+            updateRecentlyEnhancedItem(htmlMetadata.getString(FIELD_ID), title,
+                htmlMetadata.getString(FIELD_MIME_TYPE), htmlMetadata.getLong(FIELD_ENHANCEMENT_COUNT));
+        } catch (JSONException e) {
+            throw new StoreException("Failed to read HTML metadata of content item", e);
+        }
+    }
+
+    private void updateRecentlyEnhancedItem(String contentItemID,
+                                            String title,
+                                            String mimeType,
+                                            long enhancementCount) throws StoreException {
+        // get connection
+        Connection con = dbManager.getConnection();
+
+        // check existence of record for the given content item id
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        boolean recordExist = false;
+        try {
+            ps = con.prepareStatement(SELECT_RECENTLY_ENHANCED_ITEM);
+            ps.setString(1, contentItemID);
+            rs = ps.executeQuery();
+            if (rs.next()) {
+                recordExist = true;
+            }
+
+        } catch (SQLException e) {
+            dbManager.closeConnection(con);
+            log.error("Failed to query information of content item", e);
+            throw new StoreException("Failed to query information of content item", e);
+        } finally {
+            dbManager.closeResultSet(rs);
+            dbManager.closeStatement(ps);
+        }
+
+        // update the table
+        try {
+            if (!recordExist) {
+                log.debug("Content item: {} will be added to recently_enhanced table", contentItemID);
+                ps = con.prepareStatement(INSERT_RECENTLY_ENHANCED_ITEM);
+                ps.setString(1, contentItemID);
+                ps.setString(2, title);
+                ps.setString(3, mimeType);
+                ps.setLong(4, enhancementCount);
+            } else {
+                log.debug("Content item: {} will be updated in recently_enhanced table", contentItemID);
+                ps = con.prepareStatement(UPDATE_RECENTLY_ENHANCED_ITEM);
+                ps.setString(1, mimeType);
+                ps.setString(2, title);
+                ps.setLong(3, enhancementCount);
+                ps.setString(4, contentItemID);
+            }
+            int updatedRecordNum = ps.executeUpdate();
+            // exactly one record should be affected
+            if (updatedRecordNum != 1) {
+                log.warn("Unexpected number of updated records: {}, should be 1", updatedRecordNum);
+            }
+        } catch (SQLException e) {
+            log.error("Failed to update recently_enhanced table", e);
+            throw new StoreException("Failed to update recently_enhanced table", e);
+        } finally {
+            dbManager.closeStatement(ps);
+            dbManager.closeConnection(con);
+        }
+    }
+
+    private void archiveContentItem(ContentItem ci) throws StoreException {
+        String fileName = encodeId(ci.getUri().getUnicodeString());
+        File file = new File(storeFolder.getPath() + "/" + fileName + ".zip");
+        OutputStream os = null;
+        ZipOutputStream zos = null;
+
+        if (file.exists()) {
+            file.delete();
+        }
+        try {
+            file.createNewFile();
+            os = new FileOutputStream(file);
+            zos = new ZipOutputStream(os);
+        } catch (IOException e) {
+            cleanUp(file, zos, os);
+            throw new StoreException("Failed to initiate the ZipOutputStream", e);
+        }
+
+        JSONObject header = createHeader(ci);
+        populateZipStream(file, os, zos, header, ci);
+
+        try {
+            zos.finish();
+            zos.close();
+            os.close();
+        } catch (IOException e) {
+            throw new StoreException("Failed to finalize the ZipOutputStream", e);
+        }
+    }
+
+    private void populateZipStream(File file,
+                                   OutputStream os,
+                                   ZipOutputStream zos,
+                                   JSONObject header,
+                                   ContentItem ci) throws StoreException {
+        ZipEntry entry = null;
+        try {
+            // add "header" to archive
+            entry = new ZipEntry("header");
+            zos.putNextEntry(entry);
+            zos.write(header.toString().getBytes());
+            zos.closeEntry();
+        } catch (IOException e) {
+            cleanUp(file, zos, os);
+            throw new StoreException("Failed to write header to the ZipOutPutStream", e);
+        }
+
+        // add "metadata" to archive
+        entry = new ZipEntry("metadata");
+        try {
+            zos.putNextEntry(entry);
+            contentPartSerializer.serializeContentPart(zos, ci.getMetadata());
+            zos.closeEntry();
+        } catch (IOException e) {
+            cleanUp(file, zos, os);
+            throw new StoreException("Failed to write content item metadata to ZipOutputStream", e);
+        }
+
+        try {
+            int contentPartNum = header.getInt("contentpartnum");
+            for (int jsonIndex = 0; jsonIndex < contentPartNum; jsonIndex++) {
+                JSONObject fields = (JSONObject) header.get(Integer.toString(jsonIndex));
+                String uri = fields.get("uri").toString();
+                String className = fields.get("superclass").toString();
+                Class<?> klass;
+                try {
+                    klass = Class.forName(className);
+                } catch (ClassNotFoundException e) {
+                    log.warn("Failed to obtain class for class name: {}", className, e);
+                    continue;
+                }
+                entry = new ZipEntry(encodeId(uri));
+                try {
+                    zos.putNextEntry(entry);
+                    contentPartSerializer.serializeContentPart(zos, ci.getPart(new UriRef(uri), klass));
+                    zos.closeEntry();
+                } catch (IOException e) {
+                    cleanUp(file, zos, os);
+                    throw new StoreException("Failed to write the entry to the ZipOutputStream for: " + uri,
+                            e);
+                } catch (RuntimeException e) {
+                    // for clean up purposes
+                    cleanUp(file, zos, os);
+                    throw e;
+                }
+            }
+
+        } catch (JSONException e) {
+            cleanUp(file, zos, os);
+            throw new StoreException("Failed to read part information from the header", e);
+        }
+    }
+
+    private void cleanUp(File file, ZipOutputStream zos, OutputStream os) {
+        if (zos != null) {
+            try {
+                zos.finish();
+                zos.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+        if (os != null) {
+            try {
+                os.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+        file.delete();
+    }
+
+    private JSONObject createHeader(ContentItem ci) throws StoreException {
+        JSONObject header = new JSONObject();
+        JSONObject item = null;
+        Set<Class<?>> serializablePartTypes = contentPartSerializer.getSerializableTypes();
+        // eliminate subclasses not to obtain same content parts again
+        Class<?>[] serializablePartTypesArray = new Class<?>[serializablePartTypes.size()];
+        serializablePartTypes.toArray(serializablePartTypesArray);
+        Set<Class<?>> toBeRemoved = new HashSet<Class<?>>();
+        for (int i = 0; i < serializablePartTypesArray.length; i++) {
+            for (int j = i + 1; j < serializablePartTypesArray.length; j++) {
+                if (serializablePartTypesArray[j].isAssignableFrom(serializablePartTypesArray[i])) {
+                    toBeRemoved.add(serializablePartTypesArray[i]);
+                }
+            }
+        }
+        for (Class<?> klass : toBeRemoved) {
+            serializablePartTypes.remove(klass);
+        }
+        // add parts to header
+        int i = 0;
+        try {
+            String mainBlobUri = ci.getPartUri(0).getUnicodeString();
+
+            // append all parts of content item
+            for (Class<?> klass : serializablePartTypes) {
+                LinkedHashMap<UriRef,?> contentParts = ContentItemHelper.getContentParts(ci, klass);
+                for (Entry<UriRef,?> contentPartEntry : contentParts.entrySet()) {
+                    Object contentPart = contentPartEntry.getValue();
+                    String uri = contentPartEntry.getKey().getUnicodeString();
+                    item = new JSONObject();
+                    item.put("class", contentPart.getClass().getName());
+                    item.put("superclass", klass.getName());
+                    item.put("uri", uri);
+                    if (contentPart instanceof Blob) {
+                        item.put("mimetype", ((Blob) contentPart).getMimeType());
+                        // item.put("parameters", new JSONObject(((Blob) contentPart).getParameter()));
+                        if (uri.equals(mainBlobUri)) {
+                            header.put("mainblobindex", Integer.toString(i));
+                        }
+                    }
+                    header.put(Integer.toString(i++), item);
+                }
+            }
+            header.put("contentpartnum", i);
+
+        } catch (JSONException e) {
+            throw new StoreException("Failed to populate header with content parts metadata", e);
+        }
+        return header;
+    }
+
+    @Override
+    public ContentItem get(UriRef id) throws StoreException {
+        // get the zip file
+        String fileName = encodeId(id.getUnicodeString());
+        ZipFile zipFile;
+        try {
+            zipFile = new ZipFile(new File(storeFolder.getPath() + "/" + fileName + ".zip"));
+        } catch (ZipException e) {
+            throw new StoreException(String.format("Failed to get file for the given id: %s", id), e);
+        } catch (IOException e) {
+            throw new StoreException(String.format("Failed to get file for the given id: %s", id), e);
+        }
+
+        // deserialize headers
+        JSONObject header;
+        try {
+            InputStream is = zipFile.getInputStream(zipFile.getEntry("header"));
+            StringWriter writer = new StringWriter();
+            IOUtils.copy(is, writer);
+            String headerString = writer.toString();
+            header = new JSONObject(headerString);
+        } catch (IOException e) {
+            throw new StoreException(
+                    String.format("Failed to get header entry from the zip for of the content item: %s",
+                        id.getUnicodeString()), e);
+        } catch (JSONException e) {
+            throw new StoreException(
+                    String.format("Failed to get header entry from the zip for of the content item: %s",
+                        id.getUnicodeString()), e);
+        }
+
+        // deserialize metadata
+        MGraph metadata;
+        try {
+            metadata = contentPartDeserializer.deserializeContentPart(
+                zipFile.getInputStream(zipFile.getEntry("metadata")), IndexedMGraph.class);
+        } catch (IOException e) {
+            throw new StoreException(String.format(
+                "Failed to get metadata from the zip of the content item %s", e));
+        }
+
+        // deserialize main blob
+        String mainBlobUri;
+        StreamSource blobSource;
+        try {
+            String mainBlobIndex = header.getString("mainblobindex");
+            JSONObject mainBlobMetadata = header.getJSONObject(mainBlobIndex);
+            mainBlobUri = mainBlobMetadata.getString("uri");
+            blobSource = new StreamSource(zipFile.getInputStream(zipFile.getEntry(encodeId(mainBlobUri))),
+                    mainBlobMetadata.getString("mimetype"));
+        } catch (JSONException e) {
+            throw new StoreException(String.format(
+                "Failed to obtain main blob from the zip of the content item: %s", id.getUnicodeString()), e);
+        } catch (IOException e) {
+            throw new StoreException(String.format(
+                "Failed to obtain main blob from the zip of the content item: %s", id.getUnicodeString()), e);
+        }
+
+        // create content item using the metadata and main blob
+        ContentItem ci;
+        try {
+            ci = contentItemFactory.createContentItem(id, blobSource, metadata);
+        } catch (IOException e) {
+            throw new StoreException("Failed to created the content item", e);
+        }
+
+        // deserialize other parts
+        int contentPartNum;
+        try {
+            contentPartNum = header.getInt("contentpartnum");
+        } catch (JSONException e) {
+            throw new StoreException("Failed to get the number of content parts from the header", e);
+        }
+
+        for (int i = 0; i < contentPartNum; i++) {
+            Object contentPart;
+            try {
+                JSONObject contentPartMetadata = header.getJSONObject(Integer.toString(i));
+                String partUri = contentPartMetadata.getString("uri");
+                if (!partUri.equals(mainBlobUri)) {
+                    contentPart = deserializeContentPart(ci, contentPartMetadata, zipFile);
+                    ci.addPart(new UriRef(partUri), contentPart);
+                }
+            } catch (JSONException e) {
+                throw new StoreException("Failed get content part metadata", e);
+            }
+        }
+
+        return ci;
+    }
+
+    /**
+     * This method tries to deserialize content part whose metadata is passed in
+     * <code>contentPartMetadata</code> object. During the deserialization process, it first tries to specific
+     * type of the part. If the operation is unsuccessful, it tries the associated class which was used when
+     * the content part is obtained from the {@link ContentItem}.
+     * 
+     */
+    private <T> T deserializeContentPart(ContentItem contentItem,
+                                         JSONObject contentPartMetadata,
+                                         ZipFile zipFile) throws StoreException {
+        String partType;
+        String partUri;
+        String partSuperType;
+        T contentPart;
+
+        try {
+            partType = contentPartMetadata.getString("class");
+            partUri = contentPartMetadata.getString("uri");
+            partSuperType = contentPartMetadata.getString("superclass");
+
+        } catch (JSONException e) {
+            throw new StoreException("Failed to read content part metadata from the header", e);
+        }
+
+        InputStream partStream;
+        try {
+            partStream = zipFile.getInputStream(zipFile.getEntry(encodeId(partUri)));
+        } catch (IOException e) {
+            throw new StoreException(String.format(
+                "Failed to retrive the entry named %s from the zip of the content item: %s",
+                encodeId(partUri), contentItem.getUri().getUnicodeString()), e);
+        }
+        try {
+            contentPart = contentPartDeserializer.deserializeContentPart(partStream, partType);
+        } catch (StoreException e) {
+            log.warn(
+                "Failed to deserialize main blob for the class: {}. Trying to serialize with the superclass: {}",
+                partType, partSuperType);
+            contentPart = contentPartDeserializer.deserializeContentPart(partStream, partSuperType);
+        }
+
+        return contentPart;
+    }
+
+    @Override
+    public ChangeSet changes(long revision, int offset, int batchSize) throws StoreException {
+        ChangeSetImpl changesSet = (ChangeSetImpl) revisionManager.getChanges(revision, offset, batchSize);
+        changesSet.setStore(this);
+        return changesSet;
+    }
+
+    private void checkStoreFolder() throws StoreException {
+        if (!storeFolder.exists()) {
+            throw new StoreException("Store folder does not exist");
+        }
+    }
+
+    private String encodeId(String id) throws StoreException {
+        try {
+            return URLEncoder.encode(id, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            log.error("Failed to encode id. {}", id, e);
+            throw new StoreException("Failed to encode id: " + id, e);
+        }
+    }
+
+    public List<JSONObject> getRecentlyEnhancedItems(int limit, int offset) throws StoreException {
+        // get connection
+        Connection con = dbManager.getConnection();
+
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+        List<JSONObject> recentlyEnhancedList = new ArrayList<JSONObject>();
+        try {
+            ps = con.prepareStatement(SELECT_RECENTLY_ENHANCED_ITEMS);
+            ps.setMaxRows(limit);
+            ps.setInt(1, offset);
+            rs = ps.executeQuery();
+
+            JSONObject htmlData;
+            while (rs.next()) {
+                htmlData = new JSONObject();
+                htmlData.put(FIELD_ID, rs.getString(FIELD_ID));
+                htmlData.put(FIELD_MIME_TYPE, rs.getString(FIELD_MIME_TYPE));
+                htmlData.put(FIELD_TITLE, rs.getString(FIELD_TITLE));
+                htmlData.put(FIELD_ENHANCEMENT_COUNT, rs.getInt(3));
+                recentlyEnhancedList.add(htmlData);
+            }
+        } catch (SQLException e) {
+            throw new StoreException("Failed to get recently enhanced items", e);
+        } catch (JSONException e) {
+            throw new StoreException("Failed to get recently enhanced items", e);
+        } finally {
+            dbManager.closeResultSet(rs);
+            dbManager.closeStatement(ps);
+            dbManager.closeConnection(con);
+        }
+
+        return recentlyEnhancedList;
+    }
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStoreDBManager.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStoreDBManager.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStoreDBManager.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/FileStoreDBManager.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file;
+
+import static org.apache.stanbol.contenthub.store.file.FileStore.*;
+import static org.apache.stanbol.contenthub.store.file.FileRevisionManager.REVISION_TABLE_NAME;
+
+import java.net.ConnectException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the Apache Derby database tables utilized in the scope of {@link FileStore}. It is
+ * responsible only for existence of the tables. Population of the tables is done by dedicated classes. This
+ * class also provides common methods regarding with SQL objects e.g obtaining connection; closing connection,
+ * statement, result set.
+ * 
+ * @author suat
+ * 
+ */
+@Component(immediate = true)
+@Service(value = FileStoreDBManager.class)
+public class FileStoreDBManager {
+
+    private static Logger log = LoggerFactory.getLogger(FileRevisionManager.class);
+
+    private static int MAX_ID_LENGTH = 1024;
+
+    private static final String CREATE_REVISION_TABLE = "CREATE TABLE " + REVISION_TABLE_NAME + " ("
+                                                        + "id VARCHAR(" + MAX_ID_LENGTH
+                                                        + ") NOT NULL PRIMARY KEY,"
+                                                        + "revision BIGINT NOT NULL)";
+
+    private static final String CREATE_RECENTLY_ENHANCED_TABLE = "CREATE TABLE "
+                                                                 + RECENTLY_ENHANCED_TABLE_NAME + " ("
+                                                                 + FIELD_ID + " VARCHAR(" + MAX_ID_LENGTH
+                                                                 + " ) NOT NULL PRIMARY KEY,"
+                                                                 + FIELD_MIME_TYPE + " VARCHAR("
+                                                                 + MAX_ID_LENGTH + "),"
+                                                                 + FIELD_ENHANCEMENT_COUNT + " BIGINT,"
+                                                                 + FIELD_TITLE + " VARCHAR(" + MAX_ID_LENGTH
+                                                                 + "))";
+
+    private static String DB_URL;
+
+    @Activate
+    protected void activate(ComponentContext componentContext) throws StoreException {
+        String stanbolHome = componentContext.getBundleContext().getProperty("sling.home");
+        DB_URL = "jdbc:derby:" + stanbolHome + "/filestore/filestorerevisions;create=true";
+        log.info("Initializing file store revision database");
+        Connection con = getConnection();
+        Statement stmt = null;
+        try {
+            // try to create revision table
+            if (!existsTable(FileRevisionManager.REVISION_TABLE_NAME)) {
+                stmt = con.createStatement();
+                stmt.executeUpdate(CREATE_REVISION_TABLE);
+                log.info("Revision table created.");
+            } else {
+                log.info("Revision table already exists");
+            }
+
+            // try to create recently_enhanced table
+            if (!existsTable(RECENTLY_ENHANCED_TABLE_NAME)) {
+                stmt = con.createStatement();
+                stmt.executeUpdate(CREATE_RECENTLY_ENHANCED_TABLE);
+                log.info("RecentlyEnhanced table created.");
+            } else {
+                log.info("RecentlyEnhanced table already exists");
+            }
+        } catch (SQLException e) {
+            log.error("Failed to create table", e);
+            throw new StoreException("Failed to create table", e);
+        } finally {
+            closeStatement(stmt);
+            closeConnection(con);
+        }
+        log.info("File store databases initialized.");
+    }
+
+    /**
+     * Check for the existence of the specified table
+     * 
+     * @param tableName
+     *            name of the table to be checked
+     * @return <code>true</code> if the table already exists, otherwise <code>false</code>.
+     * @throws StoreException
+     */
+    public boolean existsTable(String tableName) throws StoreException {
+        boolean exists = false;
+        ResultSet rs = null;
+        Connection con = getConnection();
+        try {
+            con = DriverManager.getConnection(DB_URL);
+            DatabaseMetaData meta = con.getMetaData();
+            rs = meta.getTables(null, null, null, new String[] {"TABLE"});
+            while (rs.next()) {
+                if (rs.getString("TABLE_NAME").equalsIgnoreCase(tableName)) {
+                    exists = true;
+                    break;
+                }
+            }
+        } catch (SQLException e) {
+            log.error("Failed to check existence of the table: {}", tableName);
+            throw new StoreException(String.format("Failed to check existence of the table: %s", tableName),
+                    e);
+        } finally {
+            closeResultSet(rs);
+            closeConnection(con);
+        }
+        return exists;
+    }
+
+    /**
+     * Closes the given {@link Connection}
+     * 
+     * @param con
+     */
+    public void closeConnection(Connection con) {
+        if (con != null) {
+            try {
+                con.close();
+            } catch (SQLException e) {
+                log.warn("Failed to close connection", e);
+            }
+        }
+    }
+
+    /**
+     * Closes the given {@link Statement}
+     * 
+     * @param stmt
+     */
+    public void closeStatement(Statement stmt) {
+        if (stmt != null) {
+            try {
+                stmt.close();
+            } catch (SQLException e) {
+                log.warn("Failed to close prepared statement", e);
+            }
+        }
+    }
+
+    /**
+     * Closes the given {@link ResultSet}
+     * 
+     * @param rs
+     */
+    public void closeResultSet(ResultSet rs) {
+        if (rs != null) {
+            try {
+                rs.close();
+            } catch (SQLException e) {
+                log.warn("Failed to close result set", e);
+            }
+        }
+    }
+
+    /**
+     * Creates a {@link ConnectException}
+     * 
+     * @return the {@link Connection} if successfully established
+     * @throws StoreException
+     */
+    public Connection getConnection() throws StoreException {
+        Connection con = null;
+        try {
+            con = DriverManager.getConnection(DB_URL);
+        } catch (SQLException e) {
+            log.error("Failed to obtain Derby connection", e);
+            throw new StoreException("Failed to obtain Derby connection", e);
+        }
+        return con;
+    }
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobDeserializerProvider.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobDeserializerProvider.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobDeserializerProvider.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobDeserializerProvider.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.apache.stanbol.enhancer.servicesapi.Blob;
+import org.apache.stanbol.enhancer.servicesapi.ContentItemFactory;
+import org.apache.stanbol.enhancer.servicesapi.impl.StreamSource;
+
+/**
+ * {@link ContentPartDeserializerProvider} for {@link Blob} objects. First a {@link StreamSource} is created
+ * from the given {@link InputStream} and this source is fed to the
+ * {@link ContentItemFactory#createBlob(org.apache.stanbol.enhancer.servicesapi.ContentSource)}.
+ * 
+ * @author suat
+ * 
+ */
+@Component
+@Service
+public class BlobDeserializerProvider implements ContentPartDeserializerProvider {
+
+    @Reference
+    ContentItemFactory contentItemFactory;
+
+    @Override
+    public Set<Class<?>> getSupportedContentPartTypes() {
+        Set<Class<?>> supportedClasses = new HashSet<Class<?>>();
+        supportedClasses.add(Blob.class);
+        return supportedClasses;
+    }
+
+    @Override
+    public <T> T deserialize(InputStream inputStream) throws StoreException {
+        return deserialize(inputStream, null, null);
+    }
+
+    @Override
+    public <T> T deserialize(InputStream inputStream, String mediaType) throws StoreException {
+        return deserialize(inputStream, mediaType, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T deserialize(InputStream inputStream, String mediaType, Map<String,List<String>> headers) throws StoreException {
+        Blob blob = null;
+        try {
+            blob = contentItemFactory.createBlob(new StreamSource(inputStream, mediaType, headers));
+        } catch (IOException e) {
+            throw new StoreException("Failed to serialize Blob part from InputStream", e);
+        }
+        return (T) blob;
+    }
+
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobSerializerProvider.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobSerializerProvider.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobSerializerProvider.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/BlobSerializerProvider.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file.serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.apache.stanbol.enhancer.servicesapi.Blob;
+
+/**
+ * {@link ContentPartSerializerProvider} for {@link Blob} objects. It basically copies the stream obtained
+ * through the {@link Blob#getStream()} method to the given {@link OutputStream}.
+ * 
+ * @author suat
+ * 
+ */
+@Component
+@Service
+public class BlobSerializerProvider implements ContentPartSerializerProvider {
+
+    @Override
+    public List<Class<?>> getSupportedContentPartTypes() {
+        return Arrays.asList(new Class<?>[] {Blob.class});
+    }
+
+    @Override
+    public <T> void serialize(OutputStream outputStream, T contentPart) throws StoreException {
+        Blob blobPart = (Blob) contentPart;
+        try {
+            IOUtils.copy(blobPart.getStream(), outputStream);
+        } catch (IOException e) {
+            throw new StoreException("Failed to write Blob part to OutputStream", e);
+        }
+    }
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializer.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializer.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializer.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializer.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file.serializer;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.ReferenceStrategy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the manager class delegating the content part deserialize requests to the suitable
+ * {@link ContentPartDeserializerProvider}s instances that are already registered in the OSGi environment.
+ * 
+ * @author suat
+ * 
+ */
+@Component
+@Service(value = ContentPartDeserializer.class)
+public class ContentPartDeserializer {
+    private static final Logger log = LoggerFactory.getLogger(ContentPartDeserializer.class);
+
+    private static Map<Class<?>,ContentPartDeserializerProvider> deserializerMap = new HashMap<Class<?>,ContentPartDeserializerProvider>();
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, referenceInterface = ContentPartDeserializerProvider.class, policy = ReferencePolicy.DYNAMIC, strategy = ReferenceStrategy.EVENT, bind = "bindContentPartDeserializerProvider", unbind = "unbindContentPartDeserializerProvider")
+    private List<ContentPartDeserializerProvider> deserializerList = new ArrayList<ContentPartDeserializerProvider>();
+
+    /**
+     * Deserializes the content part which will be read from the given {@link InputStream} using a
+     * {@link ContentPartDeserializerProvider} which will be obtained by the given {@link Class}. The first
+     * deserializer compatible with the given class is used. The types supported by a deserializer are
+     * compared with the given class parameter through the {@link Class#isAssignableFrom(Class)} method. This
+     * means that a deserializer is compatible with the given class if one of the supported types by
+     * deserializer either is a superclass or superinterface of the given class or same with the given class.
+     * 
+     * @param <T>
+     *            Generic type representing the content part to be returned
+     * @param is
+     *            {@link InputStream} from which the content part data will be read
+     * @param klass
+     *            Type for which a deserializer instance will be obtained.
+     * @return the deserialized content part
+     * @throws StoreException
+     */
+    public <T> T deserializeContentPart(InputStream is, Class<?> klass) throws StoreException {
+        ContentPartDeserializerProvider deserializer = null;
+        synchronized (deserializerList) {
+            for (Entry<Class<?>,ContentPartDeserializerProvider> e : deserializerMap.entrySet()) {
+                if (e.getKey().isAssignableFrom(klass)) {
+                    deserializer = e.getValue();
+                    break;
+                }
+            }
+        }
+        if (deserializer == null) {
+            throw new StoreException(String.format(
+                "Failed to obtain serializer for the content part having type: %s", klass.getName()));
+        }
+        return deserializer.deserialize(is);
+    }
+
+    /**
+     * Deserializes the content part which will be read from the given {@link InputStream} using a
+     * {@link ContentPartDeserializerProvider} which will be obtained by the given class name. In the first
+     * step, to be able to get a dedicated deserializer for the given <code>className</code>, this method
+     * checks a suitable deserializers using this name. This is done by comparing the name parameter with the
+     * name of the supported types of registered deserializers. If this attempt is unsuccessful, a serializer
+     * is tried to be obtained through the {@link #deserializeContentPart(InputStream, Class)} method after
+     * getting a {@link Class} from the given class name by {@link Class#forName(String)}.
+     * 
+     * @param <T>
+     *            Generic type of representing content part to be returned
+     * @param is
+     *            {@link InputStream} from which the content part data will be read
+     * @param className
+     *            Name of the class for which a deserializer instance will be obtained.
+     * @return the deserialized content part
+     * @throws StoreException
+     */
+    public <T> T deserializeContentPart(InputStream is, String className) throws StoreException {
+        ContentPartDeserializerProvider deserializer = null;
+        synchronized (deserializerList) {
+            for (Entry<Class<?>,ContentPartDeserializerProvider> e : deserializerMap.entrySet()) {
+                if (e.getKey().getName().equals(className)) {
+                    deserializer = e.getValue();
+                    break;
+                }
+            }
+        }
+        if (deserializer == null) {
+            log.info("No deserializer supporting directly the class: {}", className);
+            try {
+                Class<?> klass = Class.forName(className);
+                return deserializeContentPart(is, klass);
+            } catch (ClassNotFoundException e) {
+                throw new StoreException(String.format("Failed to load class: %s", className));
+            }
+        }
+        return deserializer.deserialize(is);
+    }
+
+    /**
+     * Returns the set of classes that are deserializable by the registered
+     * {@link ContentPartDeserializerProvider}s.
+     * 
+     * @return {@link Set} of deserializable classes.
+     */
+    public Set<Class<?>> getSerializableTypes() {
+        return deserializerMap.keySet();
+    }
+
+    protected void bindContentPartDeserializerProvider(ContentPartDeserializerProvider deserializerProvider) {
+        synchronized (deserializerList) {
+            deserializerList.add(deserializerProvider);
+            refreshDeserializerMap();
+        }
+    }
+
+    protected void unbindContentPartDeserializerProvider(ContentPartDeserializerProvider deserializerProvider) {
+        synchronized (deserializerList) {
+            deserializerList.remove(deserializerProvider);
+            refreshDeserializerMap();
+        }
+    }
+
+    private void refreshDeserializerMap() {
+        Map<Class<?>,ContentPartDeserializerProvider> newSerializerMap = new HashMap<Class<?>,ContentPartDeserializerProvider>();
+        for (ContentPartDeserializerProvider deserializer : deserializerList) {
+            Set<Class<?>> classes = deserializer.getSupportedContentPartTypes();
+            for (Class<?> c : classes) {
+                newSerializerMap.put(c, deserializer);
+            }
+        }
+        deserializerMap = newSerializerMap;
+    }
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializerProvider.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializerProvider.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializerProvider.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartDeserializerProvider.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file.serializer;
+
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.apache.stanbol.enhancer.servicesapi.ContentItem;
+import org.apache.stanbol.enhancer.servicesapi.ContentSource;
+
+/**
+ * Deserializer for {@link ContentItem} parts. This interface is expected to be used while a content item is
+ * deserialized from a persistent storage. Different deserialization methods aim to provide a compatible usage
+ * with the {@link ContentSource} interface which is used during the content part creation process.
+ * 
+ * @author suat
+ * 
+ */
+public interface ContentPartDeserializerProvider {
+    /**
+     * Returns the {@link Set} of supported classes.
+     * 
+     * @return
+     */
+    Set<Class<?>> getSupportedContentPartTypes();
+
+    /**
+     * Serializes the content part to be retrieved from the given {@link InputStream}.
+     * 
+     * @param <T>
+     *            Generic type representing content part to be returned
+     * @param inputStream
+     *            {@link InputStream} providing the raw content part
+     * @return deserialized content part
+     * @throws StoreException
+     */
+    <T> T deserialize(InputStream inputStream) throws StoreException;
+
+    /**
+     * Serializes the content part to be retrieved from the given {@link InputStream}.
+     * 
+     * @param <T>
+     *            Generic type representing content part to be returned
+     * @param inputStream
+     *            {@link InputStream} providing the raw content part
+     * @param mediaType
+     *            Media type of the content part
+     * 
+     * @return deserialized content part
+     * @throws StoreException
+     */
+    <T> T deserialize(InputStream inputStream, String mediaType) throws StoreException;
+
+    /**
+     * Serialized the content part to be retrieved from the given {@link InputStream}.
+     * 
+     * @param <T>
+     *            Generic type representing content part to be returned
+     * @param inputStream
+     *            {@link InputStream} providing the raw content part
+     * @param mediaType
+     *            Media type of the content part
+     * @param headers
+     *            Optional headers for the the content part
+     * @return
+     * @throws StoreException
+     */
+    <T> T deserialize(InputStream inputStream, String mediaType, Map<String,List<String>> headers) throws StoreException;
+}

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializer.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializer.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializer.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializer.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file.serializer;
+
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.ReferenceStrategy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+
+/**
+ * This is the manager class delegating the content part serialize requests to the suitable
+ * {@link ContentPartSerializerProvider}s instances that are already registered in the OSGi environment.
+ * 
+ * @author suat
+ * 
+ */
+@Component
+@Service(value = ContentPartSerializer.class)
+public class ContentPartSerializer {
+    private static Map<Class<?>,ContentPartSerializerProvider> serializerMap = new HashMap<Class<?>,ContentPartSerializerProvider>();
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, referenceInterface = ContentPartSerializerProvider.class, policy = ReferencePolicy.DYNAMIC, strategy = ReferenceStrategy.EVENT, bind = "bindContentPartSerializerProvider", unbind = "unbindContentPartSerializerProvider")
+    private List<ContentPartSerializerProvider> serializerList = new ArrayList<ContentPartSerializerProvider>();
+
+    /**
+     * Serializes the content part to the given {@link OutputStream} using a
+     * {@link ContentPartSerializerProvider} which will be obtained by the class of the content part. The
+     * first serializer compatible with the class of the content part is used. The types of supported by a
+     * serializer is compared with the class of the given content part through the
+     * {@link Class#isAssignableFrom(Class)} method. This means that a serializer is compatible with the given
+     * content part if one of the supported types by serializer either is a superclass or superinterface of
+     * the class of the content part or same with the class of the content part.
+     * 
+     * @param <T>
+     *            Generic type representing content part to be serialized
+     * @param os
+     *            {@link OutputStream} to which the content part will be serialized
+     * @param contentPart
+     *            Content part to be serialized
+     * @throws StoreException
+     */
+    public <T> void serializeContentPart(OutputStream os, T contentPart) throws StoreException {
+        ContentPartSerializerProvider serializer = null;
+        synchronized (serializerList) {
+            for (Entry<Class<?>,ContentPartSerializerProvider> e : serializerMap.entrySet()) {
+                if (e.getKey().isAssignableFrom(contentPart.getClass())) {
+                    serializer = e.getValue();
+                    break;
+                }
+            }
+        }
+        if (serializer == null) {
+            throw new StoreException(String.format(
+                "Failed to obtain serializer for the content part having type: %s", contentPart.getClass()
+                        .getName()));
+        }
+        serializer.serialize(os, contentPart);
+    }
+
+    /**
+     * Returns the set of classes that are serializable by the registered
+     * {@link ContentPartSerializerProvider}s.
+     * 
+     * @return {@link Set} of serializable classes.
+     */
+    public Set<Class<?>> getSerializableTypes() {
+        return serializerMap.keySet();
+    }
+
+    protected void bindContentPartSerializerProvider(ContentPartSerializerProvider serializerProvider) {
+        synchronized (serializerList) {
+            serializerList.add(serializerProvider);
+            refreshSerializerMap();
+        }
+    }
+
+    protected void unbindContentPartSerializerProvider(ContentPartSerializerProvider serializerProvider) {
+        synchronized (serializerList) {
+            serializerList.remove(serializerProvider);
+            refreshSerializerMap();
+        }
+    }
+
+    private <T> void refreshSerializerMap() {
+        Map<Class<?>,ContentPartSerializerProvider> newSerializerMap = new HashMap<Class<?>,ContentPartSerializerProvider>();
+        for (ContentPartSerializerProvider serializer : serializerList) {
+            List<Class<?>> classes = serializer.getSupportedContentPartTypes();
+            for (Class<?> c : classes) {
+                newSerializerMap.put(c, serializer);
+            }
+        }
+        serializerMap = newSerializerMap;
+    }
+}
\ No newline at end of file

Added: incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializerProvider.java
URL: http://svn.apache.org/viewvc/incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializerProvider.java?rev=1345253&view=auto
==============================================================================
--- incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializerProvider.java (added)
+++ incubator/stanbol/branches/contenthub-two-layered-structure/contenthub/store/file/src/main/java/org/apache/stanbol/contenthub/store/file/serializer/ContentPartSerializerProvider.java Fri Jun  1 15:48:24 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stanbol.contenthub.store.file.serializer;
+
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.stanbol.contenthub.servicesapi.store.StoreException;
+import org.apache.stanbol.enhancer.servicesapi.ContentItem;
+
+/**
+ * Serializer for {@link ContentItem} parts. This interface is expected to be used while a {@link ContentItem}
+ * is serialized into a persistent storage.
+ * 
+ * @author suat
+ * 
+ */
+public interface ContentPartSerializerProvider {
+    /**
+     * Returns the {@link Set} of classes which are supported by a specific instance of
+     * {@link ContentPartSerializerProvider}.
+     * 
+     * @return
+     */
+    List<Class<?>> getSupportedContentPartTypes();
+
+    /**
+     * Serializes the provided content part to the provided {@link OutputStream}.
+     * 
+     * @param <T>
+     *            Generic type representing content part to be serialized
+     * @param outputStream
+     *            {@link OutputStream} into which the content part will be serialized
+     * @param contentPart
+     *            Content part to be serialized
+     * @throws StoreException
+     */
+    <T> void serialize(OutputStream outputStream, T contentPart) throws StoreException;
+}