You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Markus Holzemer (JIRA)" <ji...@apache.org> on 2014/07/10 12:06:05 UTC

[jira] [Created] (FLINK-1018) Logistic Regression deadlocks

Markus Holzemer created FLINK-1018:
--------------------------------------

             Summary: Logistic Regression deadlocks
                 Key: FLINK-1018
                 URL: https://issues.apache.org/jira/browse/FLINK-1018
             Project: Flink
          Issue Type: Bug
            Reporter: Markus Holzemer
         Attachments: LogisticRegression.java

We are currently running our implementation of logistic regression with batch gradient descent on the cluster.
Unfortunatelly for datasets > 1GB it seems to deadlock inside of the iteration. This means the first iteration is never finished.

The iteration does a map over all points, the map gets the iteration input as broadcast variable. The result of the map is reduced and the result of the reducer (1 tuple) is crossed with the iteration input.

There should be no reason for the deadlock, since the data is still quite small compared to the cluster size (4 nodes a 32GB). Also the datasize stays constant throughout the algorithm.

Here is the generated plan. I will also attach the full algorithm.
{code}
{
	"nodes": [

	{
		"id": 2,
		"type": "source",
		"pact": "Data Source",
		"contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.",
		"parallelism": "1",
		"subtasks_per_instance": "1",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "0.0 B" },
			{ "name": "Cumulative CPU", "value": "0.0 " }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"step_function": [
	{
		"id": 7,
		"type": "source",
		"pact": "Data Source",
		"contents": "TextInputFormat (D:/Devel/HIGGS-0.0001.csv) - UTF-8",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "83.27 MB" },
			{ "name": "Est. Cardinality", "value": "113.9. K" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "83.27 MB" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "83.27 MB" },
			{ "name": "Cumulative CPU", "value": "0.0 " }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 6,
		"type": "pact",
		"pact": "Map",
		"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"predecessors": [
			{"id": 7, "ship_strategy": "Forward"}
		],
		"driver_strategy": "Map",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "113.9. K" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "83.27 MB" },
			{ "name": "Cumulative CPU", "value": "0.0 " }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 9,
		"type": "pact",
		"pact": "Map",
		"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"predecessors": [
			{"id": 6, "ship_strategy": "Forward"}
		],
		"driver_strategy": "Map",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "113.9. K" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "41.63 MB" },
			{ "name": "Cumulative CPU", "value": "0.0 " }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 8,
		"type": "pact",
		"pact": "Reduce",
		"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
		"parallelism": "1",
		"subtasks_per_instance": "1",
		"predecessors": [
			{"id": 9, "ship_strategy": "Forward"}
		],
		"driver_strategy": "Reduce All",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "41.63 MB" },
			{ "name": "Cumulative CPU", "value": "0.0 " }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 10,
		"type": "pact",
		"pact": "Bulk Partial Solution",
		"contents": "Partial Solution",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "0.0 B" },
			{ "name": "Cumulative CPU", "value": "0.0 " }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 5,
		"type": "pact",
		"pact": "Map",
		"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"predecessors": [
			{"id": 6, "side": "first", "ship_strategy": "Forward", "temp_mode": "CACHED"},
			{"id": 8, "side": "second", "ship_strategy": "Broadcast"},
			{"id": 10, "side": "second", "ship_strategy": "Broadcast"}
		],
		"driver_strategy": "Map",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "113.9. K" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "(unknown)" },
			{ "name": "CPU", "value": "(unknown)" },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 4,
		"type": "pact",
		"pact": "Reduce",
		"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
		"parallelism": "1",
		"subtasks_per_instance": "1",
		"predecessors": [
			{"id": 5, "ship_strategy": "Forward"}
		],
		"driver_strategy": "Reduce All",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "0.0 B" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 3,
		"type": "pact",
		"pact": "Cross",
		"contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"predecessors": [
			{"id": 4, "side": "first", "ship_strategy": "Forward"},
			{"id": 10, "side": "second", "ship_strategy": "Broadcast", "temp_mode": "PIPELINE_BREAKER"}
		],
		"driver_strategy": "Nested Loops (Blocked Outer: de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)",
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "(unknown)" },
			{ "name": "Disk I/O", "value": "(unknown)" },
			{ "name": "CPU", "value": "(unknown)" },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	}
		],
		"partial_solution": 10,
		"next_partial_solution": 3,
		"id": 1,
		"type": "bulk_iteration",
		"pact": "Bulk Iteration",
		"contents": "Bulk Iteration",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"predecessors": [
			{"id": 2, "ship_strategy": "Forward"}
		],
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "(unknown)" },
			{ "name": "Disk I/O", "value": "(unknown)" },
			{ "name": "CPU", "value": "(unknown)" },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	},
	{
		"id": 0,
		"type": "sink",
		"pact": "Data Sink",
		"contents": "TextOutputFormat (D:/Devel/theta) - UTF-8",
		"parallelism": "2",
		"subtasks_per_instance": "2",
		"predecessors": [
			{"id": 1, "ship_strategy": "Forward"}
		],
		"global_properties": [
			{ "name": "Partitioning", "value": "RANDOM" },
			{ "name": "Partitioning Order", "value": "(none)" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"local_properties": [
			{ "name": "Order", "value": "(none)" },
			{ "name": "Grouping", "value": "not grouped" },
			{ "name": "Uniqueness", "value": "not unique" }
		],
		"estimates": [
			{ "name": "Est. Output Size", "value": "(unknown)" },
			{ "name": "Est. Cardinality", "value": "(unknown)" }		],
		"costs": [
			{ "name": "Network", "value": "0.0 B" },
			{ "name": "Disk I/O", "value": "0.0 B" },
			{ "name": "CPU", "value": "0.0 " },
			{ "name": "Cumulative Network", "value": "(unknown)" },
			{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
			{ "name": "Cumulative CPU", "value": "(unknown)" }
		],
		"compiler_hints": [
			{ "name": "Output Size (bytes)", "value": "(none)" },
			{ "name": "Output Cardinality", "value": "(none)" },
			{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
			{ "name": "Filter Factor", "value": "(none)" }		]
	}
	]
}

{code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)