You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oodt.apache.org by ri...@apache.org on 2012/02/21 05:21:50 UTC

svn commit: r1291587 - in /oodt/trunk: ./ grid/src/main/java/org/apache/oodt/grid/ xmlps/ xmlps/src/main/java/org/apache/oodt/xmlps/product/ xmlps/src/main/java/org/apache/oodt/xmlps/structs/ xmlps/src/test/java/org/apache/oodt/xmlps/structs/

Author: rickdn
Date: Tue Feb 21 04:21:50 2012
New Revision: 1291587

URL: http://svn.apache.org/viewvc?rev=1291587&view=rev
Log:
OODT-341 #resolve fixed CDEResultInputStream allows XMLPS to stream responses via chunked Transfer-Encoding

Added:
    oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResultInputStream.java
    oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/
    oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResult.java
    oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResultInputStream.java
Modified:
    oodt/trunk/CHANGES.txt
    oodt/trunk/grid/src/main/java/org/apache/oodt/grid/ProductQueryServlet.java
    oodt/trunk/xmlps/pom.xml
    oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/DBMSExecutor.java
    oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/XMLPSProductHandler.java
    oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResult.java

Modified: oodt/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/oodt/trunk/CHANGES.txt?rev=1291587&r1=1291586&r2=1291587&view=diff
==============================================================================
--- oodt/trunk/CHANGES.txt (original)
+++ oodt/trunk/CHANGES.txt Tue Feb 21 04:21:50 2012
@@ -3,6 +3,8 @@ Apache OODT Change Log
 
 Release 0.4: Current Development
 --------------------------------------------
+* OODT-341 XMLPS should be able to stream large results (rickdn)
+
 * OODT-375 Improve ApplicationResponse::includeJavascript to support including 
   JavaScript snippets in addition to static files (ahart)
 

Modified: oodt/trunk/grid/src/main/java/org/apache/oodt/grid/ProductQueryServlet.java
URL: http://svn.apache.org/viewvc/oodt/trunk/grid/src/main/java/org/apache/oodt/grid/ProductQueryServlet.java?rev=1291587&r1=1291586&r2=1291587&view=diff
==============================================================================
--- oodt/trunk/grid/src/main/java/org/apache/oodt/grid/ProductQueryServlet.java (original)
+++ oodt/trunk/grid/src/main/java/org/apache/oodt/grid/ProductQueryServlet.java Tue Feb 21 04:21:50 2012
@@ -17,13 +17,6 @@
 
 package org.apache.oodt.grid;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 import org.apache.oodt.product.LargeProductQueryHandler;
 import org.apache.oodt.product.ProductException;
 import org.apache.oodt.product.QueryHandler;
@@ -32,21 +25,32 @@ import org.apache.oodt.xmlquery.LargeRes
 import org.apache.oodt.xmlquery.Result;
 import org.apache.oodt.xmlquery.XMLQuery;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
 
 /**
  * Product query servlet handles product queries.  It always returns the first matching
  * product, if any.  If no handler can provide a product, it returns 404 Not Found.  If
  * there are no query handlers, it returns 404 Not Found.
- * 
+ *
  */
 public class ProductQueryServlet extends QueryServlet {
 	/** {@inheritDoc} */
-	protected List getServers(Configuration config) {
+	@Override
+  protected List getServers(Configuration config) {
 		return config.getProductServers();
 	}
 
 	/** {@inheritDoc} */
-	protected void handleQuery(XMLQuery query, List handlers, HttpServletRequest req, HttpServletResponse res)
+	@Override
+  protected void handleQuery(XMLQuery query, List handlers, HttpServletRequest req, HttpServletResponse res)
 		throws IOException, ServletException {
 		if (handlers.isEmpty()) {
 			res.sendError(HttpServletResponse.SC_NOT_FOUND, "no query handlers available to handle query");
@@ -94,6 +98,7 @@ public class ProductQueryServlet extends
 			int num;						       // And a place to count data
 			while ((num = in.read(buf)) != -1)			       // While we read
 				res.getOutputStream().write(buf, 0, num);	       // We write
+			res.getOutputStream().flush();
 		} finally {							       // And finally
 			if (in != null) try {					       // If we opened it
 				in.close();					       // Close it
@@ -111,7 +116,8 @@ public class ProductQueryServlet extends
 		String contentType = result.getMimeType();			       // Grab the content type
 		res.setContentType(contentType);				       // Set it
 		long size = result.getSize();					       // Grab the size
-		res.addHeader("Content-Length", String.valueOf(size));		       // Don't use setContentLength(int)
+		if (size >= 0)
+		  res.addHeader("Content-Length", String.valueOf(size));		       // Don't use setContentLength(int)
 		if (!displayable(contentType))					       // Finally, if a browser can't show it
 			suggestFilename(result.getResourceID(), res);		       // Then suggest a save-as filename
 	}
@@ -178,7 +184,7 @@ public class ProductQueryServlet extends
 		}
 
 		/** Handler to use. */
-		private LargeProductQueryHandler handler;
+		private final LargeProductQueryHandler handler;
 	}
 }
 

Modified: oodt/trunk/xmlps/pom.xml
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/pom.xml?rev=1291587&r1=1291586&r2=1291587&view=diff
==============================================================================
--- oodt/trunk/xmlps/pom.xml (original)
+++ oodt/trunk/xmlps/pom.xml Tue Feb 21 04:21:50 2012
@@ -124,11 +124,19 @@
 			<groupId>javax.servlet</groupId>
 			<artifactId>servlet-api</artifactId>
 			<version>2.3</version>
+			<scope>provided</scope>
 		</dependency>
 		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
 			<version>3.8.2</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.easymock</groupId>
+			<artifactId>easymock</artifactId>
+			<version>3.0</version>
+			<scope>test</scope>
 		</dependency>
 	</dependencies>
 </project>

Modified: oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/DBMSExecutor.java
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/DBMSExecutor.java?rev=1291587&r1=1291586&r2=1291587&view=diff
==============================================================================
--- oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/DBMSExecutor.java (original)
+++ oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/DBMSExecutor.java Tue Feb 21 04:21:50 2012
@@ -19,22 +19,12 @@ package org.apache.oodt.xmlps.product;
 
 //OODT imports
 import org.apache.oodt.commons.database.DatabaseConnectionBuilder;
-import org.apache.oodt.xmlps.mapping.FieldType;
-import org.apache.oodt.xmlps.mapping.Mapping;
-import org.apache.oodt.xmlps.mapping.MappingField;
-import org.apache.oodt.xmlps.mapping.funcs.MappingFunc;
 import org.apache.oodt.xmlps.structs.CDEResult;
-import org.apache.oodt.xmlps.structs.CDERow;
-import org.apache.oodt.xmlps.structs.CDEValue;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import javax.sql.DataSource;
 
@@ -50,96 +40,27 @@ public class DBMSExecutor {
 
   private final DataSource dataSource;
 
-  private static final Logger LOG = Logger.getLogger(DBMSExecutor.class
-      .getName());
-
   public DBMSExecutor() {
     String jdbcUrl = System.getProperty("xmlps.datasource.jdbc.url");
     String user = System.getProperty("xmlps.datasource.jdbc.user");
     String pass = System.getProperty("xmlps.datasource.jdbc.pass");
     String driver = System.getProperty("xmlps.datasource.jdbc.driver");
-    dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass, driver,
-        jdbcUrl);
+    dataSource = DatabaseConnectionBuilder.buildDataSource(user, pass, driver, jdbcUrl);
   }
 
-  public CDEResult executeLocalQuery(Mapping map, String sql,
-      List<String> returnNames) throws SQLException {
-    Connection conn = null;
-    Statement statement = null;
-
-    CDEResult result = null;
-
+  public CDEResult executeLocalQuery(String sql) throws SQLException {
     try {
-      conn = dataSource.getConnection();
-      statement = conn.createStatement();
+      Connection conn = dataSource.getConnection();
+      Statement statement = conn.createStatement();
       ResultSet rs = statement.executeQuery(sql);
-
-      result = new CDEResult();
-
-      while (rs.next()) {
-        CDERow row = toCDERow(rs, map, returnNames);
-        result.getRows().add(row);
-      }
-
+      CDEResult result = new CDEResult(rs, conn);
+      return result;
     } catch (SQLException e) {
       e.printStackTrace();
       throw e;
-    } finally {
-      if (statement != null) {
-        try {
-          statement.close();
-        } catch (Exception ignore) {
-        }
-
-        statement = null;
-      }
-
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (Exception ignore) {
-        }
-
-        conn = null;
-      }
     }
-
-    return result;
-
+    // do not close the Statement or Connection here
+    // call CDEResult#close() to close ResultSet and Connection
   }
-
-  private CDERow toCDERow(ResultSet rs, Mapping map, List<String> returnNames) {
-    CDERow row = new CDERow();
-    if (returnNames != null && returnNames.size() > 0) {
-      for (Iterator<String> i = returnNames.iterator(); i.hasNext();) {
-        String retName = i.next();
-        MappingField fld = map.getFieldByName(retName);
-        // only handle dynamic fields here
-        // if it was a constant field, then it will be dealt with
-        // later
-        if (fld.getType().equals(FieldType.DYNAMIC)) {
-          // go ahead and add it in
-          try {
-            String elemDbVal = rs.getString(retName);
-            for (Iterator<MappingFunc> j = fld.getFuncs().iterator(); j
-                .hasNext();) {
-              MappingFunc func = j.next();
-              CDEValue origVal = new CDEValue(fld.getName(), elemDbVal);
-              CDEValue newVal = func.inverseTranslate(origVal);
-              elemDbVal = newVal.getVal();
-            }
-
-            row.getVals().add(new CDEValue(fld.getName(), elemDbVal));
-          } catch (SQLException e) {
-            LOG.log(Level.WARNING, "Unable to obtain field: [" + retName
-                + "] from result set: message: " + e.getMessage());
-          }
-        }
-      }
-    }
-
-    return row;
-
-  }
-
+  
 }

Modified: oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/XMLPSProductHandler.java
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/XMLPSProductHandler.java?rev=1291587&r1=1291586&r2=1291587&view=diff
==============================================================================
--- oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/XMLPSProductHandler.java (original)
+++ oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/product/XMLPSProductHandler.java Tue Feb 21 04:21:50 2012
@@ -29,11 +29,9 @@ import org.apache.oodt.xmlps.mapping.fun
 import org.apache.oodt.xmlps.queryparser.Expression;
 import org.apache.oodt.xmlps.queryparser.HandlerQueryParser;
 import org.apache.oodt.xmlps.structs.CDEResult;
-import org.apache.oodt.xmlps.structs.CDERow;
 import org.apache.oodt.xmlps.structs.CDEValue;
 import org.apache.oodt.xmlps.util.XMLQueryHelper;
 import org.apache.oodt.xmlquery.QueryElement;
-import org.apache.oodt.xmlquery.Result;
 import org.apache.oodt.xmlquery.XMLQuery;
 
 import java.io.FileInputStream;
@@ -230,22 +228,10 @@ public class XMLPSProductHandler impleme
 
         if (executor != null) {
             try {
-                CDEResult res = executor.executeLocalQuery(this.mapping,
-                        sqlBuf.toString(), toSQLResultSetColumns(selectNames));
-
-                res = addConstFields(res,
-                        getConstElemNamesFromQueryElemSet(query
-                                .getSelectElementSet()));
-
-                if (res.getRows() == null
-                        || (res.getRows() != null && res.getRows().size() == 0)) {
-                  LOG.log(Level.WARNING, "Query retrieved no results.");
-                }
-                
-                Result r = res.toResult();
-                if (r != null) {
-                    query.getResults().add(r);
-                }
+                CDEResult res = executor.executeLocalQuery(sqlBuf.toString());
+                res.setMapping(mapping);
+                res.setConstValues(getConstValuesForQuery(query));
+                query.getResults().add(res);
             } catch (SQLException e) {
                 e.printStackTrace();
                 LOG.log(Level.WARNING, "Error executing sql: ["
@@ -255,35 +241,19 @@ public class XMLPSProductHandler impleme
 
     }
 
-    private CDEResult addConstFields(CDEResult res,
-            List<QueryElement> constFlds) {
-        CDEResult newRes = res;
-        if (constFlds != null && constFlds.size() > 0) {
-            if (res == null
-                    || (res != null && (res.getRows() == null || (res.getRows() != null && res
-                            .getRows().size() == 0)))) {
-                newRes = new CDEResult();
-                // add one row
-                newRes.getRows().add(new CDERow());
-            }
-
-            for (Iterator<QueryElement> i = constFlds.iterator(); i.hasNext();) {
-                QueryElement elem = i.next();
-                MappingField fld = this.mapping.getFieldByLocalName(elem
-                        .getValue());
-
-                for (Iterator<CDERow> j = newRes.getRows().iterator(); j
-                        .hasNext();) {
-                    CDERow row = j.next();
-                    CDEValue val = new CDEValue(fld.getName(), fld
-                            .getConstantValue());
-                    row.getVals().add(val);
+    private List<CDEValue> getConstValuesForQuery(XMLQuery query) {
+        List<QueryElement> select = query.getSelectElementSet();
+        List<QueryElement> constNames = getConstElemNamesFromQueryElemSet(select);
+        List<CDEValue> constValues = new ArrayList<CDEValue>();
+        if (constNames != null) {
+            for (QueryElement qe : constNames) {
+                MappingField fld = mapping.getFieldByLocalName(qe.getValue());
+                if (fld != null) {
+                    constValues.add(new CDEValue(fld.getName(), fld.getConstantValue()));
                 }
             }
         }
-
-        return newRes;
-
+        return constValues;
     }
 
     private String toSQLSelectColumns(List<QueryElement> elems) {
@@ -306,21 +276,6 @@ public class XMLPSProductHandler impleme
         return buf.toString();
     }
 
-    private List<String> toSQLResultSetColumns(List<QueryElement> elems) {
-        if (elems == null || (elems != null && elems.size() == 0))
-          return Collections.emptyList();
-
-        List<String> resultSetNames = new ArrayList<String>();
-        for (QueryElement qe : elems) {
-            MappingField fld = this.mapping.getFieldByLocalName(qe.getValue());
-            if (fld != null) {
-                resultSetNames.add(fld.getName());
-            }
-        }
-
-        return resultSetNames;
-    }
-
     protected void translateToDomain(List<QueryElement> elemSet,
             boolean selectSet) throws Exception {
         // go through each query element: use the mapping fields
@@ -384,7 +339,7 @@ public class XMLPSProductHandler impleme
         }
 
     }
-  
+
     protected Set<DatabaseTable> getRequiredTables(
             List<QueryElement> whereElemNames, List<QueryElement> selectElemNames) {
         Set<DatabaseTable> tables = new HashSet<DatabaseTable>();

Modified: oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResult.java
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResult.java?rev=1291587&r1=1291586&r2=1291587&view=diff
==============================================================================
--- oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResult.java (original)
+++ oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResult.java Tue Feb 21 04:21:50 2012
@@ -18,75 +18,112 @@
 package org.apache.oodt.xmlps.structs;
 
 //JDK imports
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Vector;
-
-//OODT imports
-import org.apache.oodt.commons.util.DateConvert;
+import org.apache.oodt.xmlps.mapping.Mapping;
+import org.apache.oodt.xmlps.mapping.MappingField;
+import org.apache.oodt.xmlps.mapping.funcs.MappingFunc;
 import org.apache.oodt.xmlquery.Result;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
 /**
- * 
- * <p>
- * A {@link List} of {@link CDERow}s returned from a query against a
- * Product Server.
- * </p>.
+ * A {@link Result} that wraps a {@link ResultSet} and returns rows as Strings,
+ * applying {@link MappingFuncs} if a {@link Mapping} is provided, and appending
+ * constant fields if a {@link List} of {@link CDEValue}s is provided.
  */
-public class CDEResult {
+public class CDEResult extends Result {
 
-    private List<CDERow> rows;
+  private static final long serialVersionUID = 1L;
 
-    private static final String ROW_TERMINATOR = "$";
+  private static final String ROW_TERMINATOR = "$";
 
-    public CDEResult() {
-        rows = new Vector<CDERow>();
+  private final ResultSet rs;
+  private final Connection con;
+  private Mapping mapping;
+  private List<CDEValue> constValues;
+
+  public CDEResult(ResultSet rs, Connection con) {
+    this.rs = rs;
+    this.con = con;
+    setMimeType("text/plain");
+  }
+
+  public void setMapping(Mapping mapping) {
+    this.mapping = mapping;
+  }
+
+  public void setConstValues(List<CDEValue> constValues) {
+    this.constValues = constValues;
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    if (rs == null || con == null)
+      throw new IOException("InputStream not ready, ResultSet or Connection is null!");
+    return new CDEResultInputStream(this);
+  }
+
+  @Override
+  public long getSize() {
+    return -1;
+  }
+
+ public void close() throws SQLException {
+    if (rs != null)
+      rs.close();
+    if (con != null)
+      con.close();
+  }
+
+  public String getNextRowAsString() throws SQLException {
+    if (rs.next()) {
+      CDERow row = createCDERow();
+      if (mapping != null)
+        applyMappingFuncs(row);
+      if (constValues != null)
+        addConstValues(row);
+      // if there is some kind of configurable response writer,
+      // here would be a nice place to put it...
+      return row.toString() + ROW_TERMINATOR;
     }
+    return null;
+  }
 
-    public Result toResult() {
-        String strVal = toString();
-        if (strVal == null || (strVal != null && strVal.equals(""))) {
-            return null;
-        }
-
-        Result r = new Result();
-        r.setID(DateConvert.isoFormat(new Date()));
-        r.setMimeType("text/plain");
-        r.setResourceID("UNKNOWN");
-        r.setValue(toString());
-        return r;
+  private CDERow createCDERow() throws SQLException {
+    CDERow row = new CDERow();
+    ResultSetMetaData met = rs.getMetaData();
+    int count = met.getColumnCount();
+    for (int i = 1; i <= count; i++) {
+      // since the SQL query was built with "SELECT ${fieldlocalname} as ${fieldname}"
+      // we know that ResultSet column names equal CDE field names
+      // and appear in the correct order as well
+      String colName = met.getColumnName(i);
+      String colValue = rs.getString(i);
+      CDEValue val = new CDEValue(colName, colValue);
+      row.getVals().add(val);
     }
+    return row;
+  }
 
-    public String toString() {
-        StringBuffer rStr = new StringBuffer();
-        if (rows != null && rows.size() > 0) {
-            for (Iterator<CDERow> i = rows.iterator(); i.hasNext();) {
-                CDERow row = i.next();
-                rStr.append(row.toString());
-                rStr.append(ROW_TERMINATOR);
-            }
-
-            rStr.deleteCharAt(rStr.length() - 1);
+  private void applyMappingFuncs(CDERow row) {
+    for (CDEValue value : row.getVals()) {
+      MappingField fld = mapping.getFieldByName(value.getCdeName());
+      if (fld != null) {
+        for (MappingFunc func : fld.getFuncs()) {
+          CDEValue newValue = func.inverseTranslate(value);
+          value.setVal(newValue.getVal());
         }
-
-        return rStr.toString();
-
+      }
     }
+  }
 
-    /**
-     * @return the rows
-     */
-    public List<CDERow> getRows() {
-        return rows;
-    }
-
-    /**
-     * @param rows
-     *            the rows to set
-     */
-    public void setRows(List<CDERow> rows) {
-        this.rows = rows;
-    }
+  private void addConstValues(CDERow row) {
+    row.getVals().addAll(constValues);
+  }
 
 }

Added: oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResultInputStream.java
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResultInputStream.java?rev=1291587&view=auto
==============================================================================
--- oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResultInputStream.java (added)
+++ oodt/trunk/xmlps/src/main/java/org/apache/oodt/xmlps/structs/CDEResultInputStream.java Tue Feb 21 04:21:50 2012
@@ -0,0 +1,99 @@
+package org.apache.oodt.xmlps.structs;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.SQLException;
+
+class CDEResultInputStream extends InputStream {
+
+  private final CDEResult res;
+  private ByteArrayInputStream rowStream;
+
+  public CDEResultInputStream(CDEResult res) {
+    this.res = res;
+  }
+
+  private boolean fetchNextRow() throws IOException {
+    String s = null;
+    try {
+      s = res.getNextRowAsString();
+    } catch (SQLException e) {
+    }
+    if (rowStream != null)
+      rowStream.close();
+    rowStream = s == null ? null : new ByteArrayInputStream(s.getBytes("UTF-8"));
+    return rowStream != null;
+  }
+
+  private boolean ensureOpen() throws IOException {
+    if (rowStream == null || rowStream.available() <= 0)
+      return fetchNextRow();
+    return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+    return ensureOpen() ? rowStream.read() : -1;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (!ensureOpen())
+      return -1;
+
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+
+    int total = 0;
+    int n = rowStream.read(b, off, len);
+    total += n;
+    while (n != -1 && total < len) {
+      if (!fetchNextRow())
+        return total;
+      n = rowStream.read(b, off + total, len - total);
+      total += n;
+    }
+    return total;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (rowStream != null)
+      rowStream.close();
+    rowStream = null;
+
+    try {
+      res.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    if (rowStream == null)
+      return 0;
+    return rowStream.available();
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+    throw new UnsupportedOperationException("Mark not supported");
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    throw new UnsupportedOperationException("Reset not supported");
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+}

Added: oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResult.java
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResult.java?rev=1291587&view=auto
==============================================================================
--- oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResult.java (added)
+++ oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResult.java Tue Feb 21 04:21:50 2012
@@ -0,0 +1,105 @@
+package org.apache.oodt.xmlps.structs;
+
+import static org.easymock.EasyMock.expect;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+import junit.framework.TestCase;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+
+public class TestCDEResult extends TestCase {
+
+  private static final String ID = "123";
+  private static final String LAST = "doe";
+  private static final String FIRST = "john";
+  private static final String DOB = "1980-01-01 00:00:00.0";
+
+  private static final String ID1 = "321";
+  private static final String LAST1 = "smith";
+  private static final String FIRST1 = "jane";
+  private static final String DOB1 = "1990-01-01 00:00:00.0";
+
+  private static final String FS = "\t";
+  private static final String RS = "$";
+
+  private ResultSet rs;
+  private Connection con;
+  private ResultSetMetaData rsmet;
+  private CDEResult result;
+  private IMocksControl ctrl;
+
+  @Override
+  protected void setUp() throws Exception {
+    ctrl = EasyMock.createNiceControl();
+    rs = ctrl.createMock(ResultSet.class);
+    con = ctrl.createMock(Connection.class);
+    rsmet = ctrl.createMock(ResultSetMetaData.class);
+    expect(rs.next()).andReturn(true);
+    expect(rsmet.getColumnCount()).andReturn(4).anyTimes();
+    expect(rs.getMetaData()).andReturn(rsmet).anyTimes();
+    expect(rs.getString(1)).andReturn(ID);
+    expect(rs.getString(2)).andReturn(LAST);
+    expect(rs.getString(3)).andReturn(FIRST);
+    expect(rs.getString(4)).andReturn(DOB);
+    expect(rs.next()).andReturn(true);
+    expect(rs.getString(1)).andReturn(ID1);
+    expect(rs.getString(2)).andReturn(LAST1);
+    expect(rs.getString(3)).andReturn(FIRST1);
+    expect(rs.getString(4)).andReturn(DOB1);
+    expect(rs.next()).andReturn(false);
+    ctrl.replay();
+    result = new CDEResult(rs,con);
+  }
+
+  public void testGetInputStream() {
+    InputStream in = null;
+    try {
+      in = result.getInputStream();
+    } catch (IOException e) {
+      fail("Could not get inputstream: " + e.getMessage());
+    }
+    assertNotNull(in);
+    assertEquals(CDEResultInputStream.class, in.getClass());
+
+    boolean thrown = false;
+    try {
+      in = new CDEResult(null, con).getInputStream();
+    } catch (IOException e) {
+      thrown = true;
+    }
+    assertTrue("InputStream should throw IOException with null ResultSet!", thrown);
+
+    thrown = false;
+    try {
+      in = new CDEResult(rs, null).getInputStream();
+    } catch (IOException e) {
+      thrown = true;
+    }
+    assertTrue("InputStream should throw IOException with null Connection!", thrown);
+  }
+
+  public void testGetNextRowAsString() {
+    try {
+      assertEquals(ID + FS + LAST + FS + FIRST + FS + DOB + RS, result.getNextRowAsString());
+      assertEquals(ID1 + FS + LAST1 + FS + FIRST1 + FS + DOB1 + RS, result.getNextRowAsString());
+    } catch (SQLException e) {
+      fail("Could not get next row: " + e.getMessage());
+    }
+  }
+
+  public void testGetMimeType() {
+    assertEquals("text/plain", result.getMimeType());
+  }
+
+  public void testGetSize() {
+    assertEquals(-1, result.getSize());
+  }
+
+}

Added: oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResultInputStream.java
URL: http://svn.apache.org/viewvc/oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResultInputStream.java?rev=1291587&view=auto
==============================================================================
--- oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResultInputStream.java (added)
+++ oodt/trunk/xmlps/src/test/java/org/apache/oodt/xmlps/structs/TestCDEResultInputStream.java Tue Feb 21 04:21:50 2012
@@ -0,0 +1,120 @@
+package org.apache.oodt.xmlps.structs;
+
+import static org.easymock.EasyMock.expect;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+
+import junit.framework.TestCase;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+
+public class TestCDEResultInputStream extends TestCase {
+  
+  private static final String ID = "123";
+  private static final String LAST = "doe";
+  private static final String FIRST = "john";
+  private static final String DOB = "1980-01-01 00:00:00.0";
+  
+  private static final String ID1 = "321";
+  private static final String LAST1 = "smith";
+  private static final String FIRST1 = "jane";
+  private static final String DOB1 = "1990-01-01 00:00:00.0";
+  
+  private static final String FS = "\t";
+  private static final String RS = "$";
+
+  private ResultSet rs;
+  private Connection con;
+  private ResultSetMetaData rsmet;
+  private CDEResult result;
+  private CDEResultInputStream in;
+  private IMocksControl ctrl;
+  
+  @Override
+  protected void setUp() throws Exception {
+    ctrl = EasyMock.createNiceControl();
+    rs = ctrl.createMock(ResultSet.class);
+    con = ctrl.createMock(Connection.class);
+    rsmet = ctrl.createMock(ResultSetMetaData.class);
+    expect(rs.next()).andReturn(true);
+    expect(rsmet.getColumnCount()).andReturn(4).anyTimes();
+    expect(rs.getMetaData()).andReturn(rsmet).anyTimes();
+    expect(rs.getString(1)).andReturn(ID);
+    expect(rs.getString(2)).andReturn(LAST);
+    expect(rs.getString(3)).andReturn(FIRST);
+    expect(rs.getString(4)).andReturn(DOB);
+    expect(rs.next()).andReturn(true);
+    expect(rs.getString(1)).andReturn(ID1);
+    expect(rs.getString(2)).andReturn(LAST1);
+    expect(rs.getString(3)).andReturn(FIRST1);
+    expect(rs.getString(4)).andReturn(DOB1);
+    expect(rs.next()).andReturn(false);
+    ctrl.replay();
+    result = new CDEResult(rs,con);
+    in = new CDEResultInputStream(result);
+  }
+  
+ @Override
+  protected void tearDown() throws Exception {
+    in.close();
+  }
+  
+  public void testRead() {
+    String expected = ID + FS + LAST + FS + FIRST + FS + DOB + RS;
+    String expected1 = ID1 + FS + LAST1 + FS + FIRST1 + FS + DOB1 + RS;
+    try {
+      for (int i = 0; i < expected.length(); i++) {
+        assertEquals((int)expected.charAt(i), in.read());
+      }
+      for (int i = 0; i < expected1.length(); i++) {
+        assertEquals((int)expected1.charAt(i), in.read());
+      }
+      assertEquals(-1, in.read());
+    } catch (IOException e) {
+      fail("IOException: " + e.getMessage());
+    }
+  }
+  
+  public void testReadCharArrayIntInt() {
+    byte[] buf = new byte[128];
+    int n = 0;
+    int length = 0;
+    String expected = null;
+    
+    try {
+      expected = ID + FS + LAST;
+      length = expected.length();
+      n = in.read(buf, 0, length);
+      assertEquals(length, n);
+      assertEquals(expected, new String(buf, 0, n));
+
+      expected = FS + FIRST;
+      length = expected.length();
+      n = in.read(buf, 0, length);
+      assertEquals(length, n);
+      assertEquals(expected, new String(buf, 0, n));
+
+      expected = FS + DOB.substring(0, length-1);
+      length = expected.length();
+      n = in.read(buf, 0, length);
+      assertEquals(length, n);
+      assertEquals(expected, new String(buf, 0, n));
+      
+      expected = DOB.substring(4) + RS + ID1 + FS + LAST1 + FS + FIRST1 + FS + DOB1 + RS;
+      length = buf.length;
+      n = in.read(buf, 0, length);
+      assertEquals(expected.length(), n);
+      assertEquals(expected, new String(buf, 0, n));
+      
+      n = in.read(buf, 0, 10);
+      assertEquals(-1, n);
+    } catch (IOException e) {
+      fail("IOException: " + e.getMessage());
+    }
+  }
+
+}